risingwave_backup/
meta_snapshot_v2.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{Display, Formatter};
16
17use bytes::{Buf, BufMut};
18use risingwave_hummock_sdk::version::HummockVersion;
19use serde::{Deserialize, Serialize};
20
21use crate::meta_snapshot::{MetaSnapshot, Metadata};
22use crate::{BackupError, BackupResult};
23pub type MetaSnapshotV2 = MetaSnapshot<MetadataV2>;
24
25impl From<serde_json::Error> for BackupError {
26    fn from(value: serde_json::Error) -> Self {
27        BackupError::Other(value.into())
28    }
29}
30
31/// Add new item in the end. Do not change the order.
32#[macro_export]
33macro_rules! for_all_metadata_models_v2 {
34    ($macro:ident) => {
35        $macro! {
36            {seaql_migrations, risingwave_meta_model::serde_seaql_migration},
37            {version_stats, risingwave_meta_model::hummock_version_stats},
38            {compaction_configs, risingwave_meta_model::compaction_config},
39            {clusters, risingwave_meta_model::cluster},
40            {fragment_relation, risingwave_meta_model::fragment_relation},
41            {catalog_versions, risingwave_meta_model::catalog_version},
42            {connections, risingwave_meta_model::connection},
43            {databases, risingwave_meta_model::database},
44            {fragments, risingwave_meta_model::fragment},
45            {functions, risingwave_meta_model::function},
46            {indexes, risingwave_meta_model::index},
47            {objects, risingwave_meta_model::object},
48            {object_dependencies, risingwave_meta_model::object_dependency},
49            {schemas, risingwave_meta_model::schema},
50            {sinks, risingwave_meta_model::sink},
51            {sources, risingwave_meta_model::source},
52            {streaming_jobs, risingwave_meta_model::streaming_job},
53            {subscriptions, risingwave_meta_model::subscription},
54            {system_parameters, risingwave_meta_model::system_parameter},
55            {tables, risingwave_meta_model::table},
56            {users, risingwave_meta_model::user},
57            {user_privileges, risingwave_meta_model::user_privilege},
58            {views, risingwave_meta_model::view},
59            {workers, risingwave_meta_model::worker},
60            {worker_properties, risingwave_meta_model::worker_property},
61            {hummock_sequences, risingwave_meta_model::hummock_sequence},
62            {session_parameters, risingwave_meta_model::session_parameter},
63            {secrets, risingwave_meta_model::secret},
64            {exactly_once_iceberg_sinks, risingwave_meta_model::exactly_once_iceberg_sink},
65            {iceberg_tables, risingwave_meta_model::iceberg_tables},
66            {iceberg_namespace_properties, risingwave_meta_model::iceberg_namespace_properties},
67            {user_default_privilege, risingwave_meta_model::user_default_privilege},
68            {fragment_splits, risingwave_meta_model::fragment_splits},
69            {pending_sink_state, risingwave_meta_model::pending_sink_state}
70        }
71    };
72}
73
74macro_rules! define_metadata_v2 {
75    ($({ $name:ident, $mod_path:ident::$mod_name:ident }),*) => {
76        #[derive(Default)]
77        pub struct MetadataV2 {
78            pub hummock_version: HummockVersion,
79            $(
80                pub $name: Vec<$mod_path::$mod_name::Model>,
81            )*
82        }
83    };
84}
85
86for_all_metadata_models_v2!(define_metadata_v2);
87
88macro_rules! define_encode_metadata {
89    ($( {$name:ident, $mod_path:ident::$mod_name:ident} ),*) => {
90        fn encode_metadata(
91          metadata: &MetadataV2,
92          buf: &mut Vec<u8>,
93        ) -> BackupResult<()> {
94            let mut _idx = 0;
95            $(
96                if _idx == 1 {
97                    put_1(buf, &metadata.hummock_version.to_protobuf())?;
98                }
99                put_n(buf, &metadata.$name)?;
100                _idx += 1;
101            )*
102            Ok(())
103        }
104    };
105}
106
107for_all_metadata_models_v2!(define_encode_metadata);
108
109macro_rules! define_decode_metadata {
110    ($( {$name:ident, $mod_path:ident::$mod_name:ident} ),*) => {
111        fn decode_metadata(
112          metadata: &mut MetadataV2,
113          mut buf: &[u8],
114        ) -> BackupResult<()> {
115            let mut _idx = 0;
116            $(
117                if _idx == 1 {
118                    metadata.hummock_version = HummockVersion::from_persisted_protobuf(&get_1(&mut buf)?);
119                }
120                metadata.$name = get_n(&mut buf)?;
121                _idx += 1;
122            )*
123            Ok(())
124        }
125    };
126}
127
128for_all_metadata_models_v2!(define_decode_metadata);
129
130impl Display for MetadataV2 {
131    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
132        writeln!(f, "clusters: {:#?}", self.clusters)?;
133        writeln!(
134            f,
135            "Hummock version: id {}, committed_epoch: {:?}",
136            self.hummock_version.id,
137            self.hummock_version.state_table_info.info(),
138        )?;
139        // optionally dump other metadata
140        Ok(())
141    }
142}
143
144impl Metadata for MetadataV2 {
145    fn encode_to(&self, buf: &mut Vec<u8>) -> BackupResult<()> {
146        encode_metadata(self, buf)
147    }
148
149    fn decode(buf: &[u8]) -> BackupResult<Self>
150    where
151        Self: Sized,
152    {
153        let mut metadata = Self::default();
154        decode_metadata(&mut metadata, buf)?;
155        Ok(metadata)
156    }
157
158    fn hummock_version_ref(&self) -> &HummockVersion {
159        &self.hummock_version
160    }
161
162    fn hummock_version(self) -> HummockVersion {
163        self.hummock_version
164    }
165}
166
167fn put_n<T: Serialize>(buf: &mut Vec<u8>, data: &[T]) -> Result<(), serde_json::Error> {
168    buf.put_u32_le(
169        data.len()
170            .try_into()
171            .unwrap_or_else(|_| panic!("cannot convert {} into u32", data.len())),
172    );
173    for d in data {
174        put_with_len_prefix(buf, d)?;
175    }
176    Ok(())
177}
178
179fn put_1<T: Serialize>(buf: &mut Vec<u8>, data: &T) -> Result<(), serde_json::Error> {
180    put_n(buf, &[data])
181}
182
183fn get_n<'a, T: Deserialize<'a>>(buf: &mut &'a [u8]) -> Result<Vec<T>, serde_json::Error> {
184    let n = buf.get_u32_le() as usize;
185    let mut elements = Vec::with_capacity(n);
186    for _ in 0..n {
187        elements.push(get_with_len_prefix(buf)?);
188    }
189    Ok(elements)
190}
191
192fn get_1<'a, T: Deserialize<'a>>(buf: &mut &'a [u8]) -> Result<T, serde_json::Error> {
193    let elements = get_n(buf)?;
194    assert_eq!(elements.len(), 1);
195    Ok(elements.into_iter().next().unwrap())
196}
197
198fn put_with_len_prefix<T: Serialize>(buf: &mut Vec<u8>, data: &T) -> Result<(), serde_json::Error> {
199    let b = serde_json::to_vec(data)?;
200    buf.put_u32_le(
201        b.len()
202            .try_into()
203            .unwrap_or_else(|_| panic!("cannot convert {} into u32", b.len())),
204    );
205    buf.put_slice(&b);
206    Ok(())
207}
208
209fn get_with_len_prefix<'a, T: Deserialize<'a>>(buf: &mut &'a [u8]) -> Result<T, serde_json::Error> {
210    let len = buf.get_u32_le() as usize;
211    let d = serde_json::from_slice(&buf[..len])?;
212    buf.advance(len);
213    Ok(d)
214}