risingwave_backup/
meta_snapshot_v1.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17
18use bytes::{Buf, BufMut};
19use itertools::Itertools;
20use risingwave_common::util::iter_util::ZipEqFast;
21use risingwave_hummock_sdk::version::HummockVersion;
22use risingwave_pb::catalog::{
23    Connection, Database, Function, Index, Schema, Secret, Sink, Source, Subscription, Table, View,
24};
25use risingwave_pb::hummock::{CompactionGroup, HummockVersionStats, PbHummockVersion};
26use risingwave_pb::meta::{SystemParams, TableFragments};
27use risingwave_pb::user::UserInfo;
28
29use crate::error::{BackupError, BackupResult};
30use crate::meta_snapshot::{MetaSnapshot, Metadata};
31
32// TODO: remove `ClusterMetadata` and even the trait, after applying model v2.
33pub type MetaSnapshotV1 = MetaSnapshot<ClusterMetadata>;
34
35impl Display for ClusterMetadata {
36    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
37        writeln!(f, "default_cf:")?;
38        for (k, v) in &self.default_cf {
39            let key = String::from_utf8(k.clone()).unwrap();
40            writeln!(f, "{} {:x?}", key, v)?;
41        }
42        writeln!(f, "hummock_version:")?;
43        writeln!(f, "{:#?}", self.hummock_version)?;
44        writeln!(f, "version_stats:")?;
45        writeln!(f, "{:#?}", self.version_stats)?;
46        writeln!(f, "compaction_groups:")?;
47        writeln!(f, "{:#?}", self.compaction_groups)?;
48        writeln!(f, "database:")?;
49        writeln!(f, "{:#?}", self.database)?;
50        writeln!(f, "schema:")?;
51        writeln!(f, "{:#?}", self.schema)?;
52        writeln!(f, "table:")?;
53        writeln!(f, "{:#?}", self.table)?;
54        writeln!(f, "index:")?;
55        writeln!(f, "{:#?}", self.index)?;
56        writeln!(f, "sink:")?;
57        writeln!(f, "{:#?}", self.sink)?;
58        writeln!(f, "source:")?;
59        writeln!(f, "{:#?}", self.source)?;
60        writeln!(f, "view:")?;
61        writeln!(f, "{:#?}", self.view)?;
62        writeln!(f, "connection:")?;
63        writeln!(f, "{:#?}", self.connection)?;
64        writeln!(f, "table_fragments:")?;
65        writeln!(f, "{:#?}", self.table_fragments)?;
66        writeln!(f, "user_info:")?;
67        writeln!(f, "{:#?}", self.user_info)?;
68        writeln!(f, "function:")?;
69        writeln!(f, "{:#?}", self.function)?;
70        writeln!(f, "system_param:")?;
71        writeln!(f, "{:#?}", self.system_param)?;
72        writeln!(f, "cluster_id:")?;
73        writeln!(f, "{:#?}", self.cluster_id)?;
74        writeln!(f, "subscription:")?;
75        writeln!(f, "{:#?}", self.subscription)?;
76        writeln!(f, "secret:")?;
77        writeln!(f, "{:#?}", self.secret)?;
78        Ok(())
79    }
80}
81
82impl Metadata for ClusterMetadata {
83    fn encode_to(&self, buf: &mut Vec<u8>) -> BackupResult<()> {
84        self.encode_to(buf)
85    }
86
87    fn decode(buf: &[u8]) -> BackupResult<Self>
88    where
89        Self: Sized,
90    {
91        ClusterMetadata::decode(buf)
92    }
93
94    fn hummock_version_ref(&self) -> &HummockVersion {
95        &self.hummock_version
96    }
97
98    fn hummock_version(self) -> HummockVersion {
99        self.hummock_version
100    }
101}
102
103/// For backward compatibility, never remove fields and only append new field.
104#[derive(Debug, Default, Clone, PartialEq)]
105pub struct ClusterMetadata {
106    /// Unlike other metadata that has implemented `MetadataModel`,
107    /// `DEFAULT_COLUMN_FAMILY` stores various single row metadata, e.g. id offset and epoch offset.
108    /// So we use `default_cf` stores raw KVs for them.
109    pub default_cf: HashMap<Vec<u8>, Vec<u8>>,
110    pub hummock_version: HummockVersion,
111    pub version_stats: HummockVersionStats,
112    pub compaction_groups: Vec<CompactionGroup>,
113    pub database: Vec<Database>,
114    pub schema: Vec<Schema>,
115    pub table: Vec<Table>,
116    pub index: Vec<Index>,
117    pub sink: Vec<Sink>,
118    pub source: Vec<Source>,
119    pub view: Vec<View>,
120    pub table_fragments: Vec<TableFragments>,
121    pub user_info: Vec<UserInfo>,
122    pub function: Vec<Function>,
123    pub connection: Vec<Connection>,
124    pub system_param: SystemParams,
125    pub cluster_id: String,
126    pub subscription: Vec<Subscription>,
127    pub secret: Vec<Secret>,
128}
129
130impl ClusterMetadata {
131    pub fn encode_to(&self, buf: &mut Vec<u8>) -> BackupResult<()> {
132        let default_cf_keys = self.default_cf.keys().collect_vec();
133        let default_cf_values = self.default_cf.values().collect_vec();
134        Self::encode_prost_message_list(&default_cf_keys, buf);
135        Self::encode_prost_message_list(&default_cf_values, buf);
136        Self::encode_prost_message(&PbHummockVersion::from(&self.hummock_version), buf);
137        Self::encode_prost_message(&self.version_stats, buf);
138        Self::encode_prost_message_list(&self.compaction_groups.iter().collect_vec(), buf);
139        Self::encode_prost_message_list(&self.table_fragments.iter().collect_vec(), buf);
140        Self::encode_prost_message_list(&self.user_info.iter().collect_vec(), buf);
141        Self::encode_prost_message_list(&self.database.iter().collect_vec(), buf);
142        Self::encode_prost_message_list(&self.schema.iter().collect_vec(), buf);
143        Self::encode_prost_message_list(&self.table.iter().collect_vec(), buf);
144        Self::encode_prost_message_list(&self.index.iter().collect_vec(), buf);
145        Self::encode_prost_message_list(&self.sink.iter().collect_vec(), buf);
146        Self::encode_prost_message_list(&self.source.iter().collect_vec(), buf);
147        Self::encode_prost_message_list(&self.view.iter().collect_vec(), buf);
148        Self::encode_prost_message_list(&self.function.iter().collect_vec(), buf);
149        Self::encode_prost_message_list(&self.connection.iter().collect_vec(), buf);
150        Self::encode_prost_message(&self.system_param, buf);
151        Self::encode_prost_message(&self.cluster_id, buf);
152        Self::encode_prost_message_list(&self.subscription.iter().collect_vec(), buf);
153        Self::encode_prost_message_list(&self.secret.iter().collect_vec(), buf);
154        Ok(())
155    }
156
157    pub fn decode(mut buf: &[u8]) -> BackupResult<Self> {
158        let default_cf_keys: Vec<Vec<u8>> = Self::decode_prost_message_list(&mut buf)?;
159        let default_cf_values: Vec<Vec<u8>> = Self::decode_prost_message_list(&mut buf)?;
160        let default_cf = default_cf_keys
161            .into_iter()
162            .zip_eq_fast(default_cf_values.into_iter())
163            .collect();
164        let hummock_version =
165            HummockVersion::from_persisted_protobuf(&Self::decode_prost_message(&mut buf)?);
166        let version_stats = Self::decode_prost_message(&mut buf)?;
167        let compaction_groups: Vec<CompactionGroup> = Self::decode_prost_message_list(&mut buf)?;
168        let table_fragments: Vec<TableFragments> = Self::decode_prost_message_list(&mut buf)?;
169        let user_info: Vec<UserInfo> = Self::decode_prost_message_list(&mut buf)?;
170        let database: Vec<Database> = Self::decode_prost_message_list(&mut buf)?;
171        let schema: Vec<Schema> = Self::decode_prost_message_list(&mut buf)?;
172        let table: Vec<Table> = Self::decode_prost_message_list(&mut buf)?;
173        let index: Vec<Index> = Self::decode_prost_message_list(&mut buf)?;
174        let sink: Vec<Sink> = Self::decode_prost_message_list(&mut buf)?;
175        let source: Vec<Source> = Self::decode_prost_message_list(&mut buf)?;
176        let view: Vec<View> = Self::decode_prost_message_list(&mut buf)?;
177        let function: Vec<Function> = Self::decode_prost_message_list(&mut buf)?;
178        let connection: Vec<Connection> = Self::decode_prost_message_list(&mut buf)?;
179        let system_param: SystemParams = Self::decode_prost_message(&mut buf)?;
180        let cluster_id: String = Self::decode_prost_message(&mut buf)?;
181        let subscription: Vec<Subscription> =
182            Self::try_decode_prost_message_list(&mut buf).unwrap_or_else(|| Ok(vec![]))?;
183        let secret: Vec<Secret> =
184            Self::try_decode_prost_message_list(&mut buf).unwrap_or_else(|| Ok(vec![]))?;
185
186        Ok(Self {
187            default_cf,
188            hummock_version,
189            version_stats,
190            compaction_groups,
191            database,
192            schema,
193            table,
194            index,
195            sink,
196            source,
197            view,
198            table_fragments,
199            user_info,
200            function,
201            connection,
202            system_param,
203            cluster_id,
204            subscription,
205            secret,
206        })
207    }
208
209    fn encode_prost_message(message: &impl prost::Message, buf: &mut Vec<u8>) {
210        let encoded_message = message.encode_to_vec();
211        buf.put_u32_le(encoded_message.len() as u32);
212        buf.put_slice(&encoded_message);
213    }
214
215    fn decode_prost_message<T>(buf: &mut &[u8]) -> BackupResult<T>
216    where
217        T: prost::Message + Default,
218    {
219        let len = buf.get_u32_le() as usize;
220        let v = buf[..len].to_vec();
221        buf.advance(len);
222        T::decode(v.as_slice()).map_err(|e| BackupError::Decoding(e.into()))
223    }
224
225    fn encode_prost_message_list(messages: &[&impl prost::Message], buf: &mut Vec<u8>) {
226        buf.put_u32_le(messages.len() as u32);
227        for message in messages {
228            Self::encode_prost_message(*message, buf);
229        }
230    }
231
232    fn decode_prost_message_list<T>(buf: &mut &[u8]) -> BackupResult<Vec<T>>
233    where
234        T: prost::Message + Default,
235    {
236        let vec_len = buf.get_u32_le() as usize;
237        let mut result = vec![];
238        for _ in 0..vec_len {
239            let v: T = Self::decode_prost_message(buf)?;
240            result.push(v);
241        }
242        Ok(result)
243    }
244
245    fn try_decode_prost_message_list<T>(buf: &mut &[u8]) -> Option<BackupResult<Vec<T>>>
246    where
247        T: prost::Message + Default,
248    {
249        if buf.is_empty() {
250            return None;
251        }
252        Some(Self::decode_prost_message_list(buf))
253    }
254}
255
256#[cfg(test)]
257mod tests {
258    use risingwave_hummock_sdk::HummockVersionId;
259    use risingwave_pb::hummock::{CompactionGroup, TableStats};
260
261    use crate::meta_snapshot_v1::{ClusterMetadata, MetaSnapshotV1};
262
263    type MetaSnapshot = MetaSnapshotV1;
264
265    #[test]
266    fn test_snapshot_encoding_decoding() {
267        let mut metadata = ClusterMetadata::default();
268        metadata.hummock_version.id = HummockVersionId::new(321);
269        let raw = MetaSnapshot {
270            format_version: 0,
271            id: 123,
272            metadata,
273        };
274        let encoded = raw.encode().unwrap();
275        let decoded = MetaSnapshot::decode(&encoded).unwrap();
276        assert_eq!(raw, decoded);
277    }
278
279    #[test]
280    fn test_metadata_encoding_decoding() {
281        let mut buf = vec![];
282        let mut raw = ClusterMetadata::default();
283        raw.default_cf.insert(vec![0, 1, 2], vec![3, 4, 5]);
284        raw.hummock_version.id = HummockVersionId::new(1);
285        raw.version_stats.hummock_version_id = 10;
286        raw.version_stats.table_stats.insert(
287            200,
288            TableStats {
289                total_key_count: 1000,
290                ..Default::default()
291            },
292        );
293        raw.compaction_groups.push(CompactionGroup {
294            id: 3000,
295            ..Default::default()
296        });
297        raw.encode_to(&mut buf).unwrap();
298        let decoded = ClusterMetadata::decode(buf.as_slice()).unwrap();
299        assert_eq!(raw, decoded);
300    }
301}