risingwave_backup/
meta_snapshot_v2.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{Display, Formatter};
16
17use anyhow::anyhow;
18use bytes::{Buf, BufMut};
19use itertools::Itertools;
20use risingwave_hummock_sdk::version::HummockVersion;
21use serde::{Deserialize, Serialize};
22
23use crate::meta_snapshot::{MetaSnapshot, Metadata};
24use crate::{BackupError, BackupResult};
25pub type MetaSnapshotV2 = MetaSnapshot<MetadataV2>;
26
27impl From<serde_json::Error> for BackupError {
28    fn from(value: serde_json::Error) -> Self {
29        BackupError::Other(value.into())
30    }
31}
32
33/// Add new item in the end. Do not change the order.
34#[macro_export]
35macro_rules! for_all_metadata_models_v2 {
36    ($macro:ident) => {
37        $macro! {
38            {seaql_migrations, risingwave_meta_model::serde_seaql_migration},
39            {version_stats, risingwave_meta_model::hummock_version_stats},
40            {compaction_configs, risingwave_meta_model::compaction_config},
41            {clusters, risingwave_meta_model::cluster},
42            {fragment_relation, risingwave_meta_model::fragment_relation},
43            {catalog_versions, risingwave_meta_model::catalog_version},
44            {connections, risingwave_meta_model::connection},
45            {databases, risingwave_meta_model::database},
46            {fragments, risingwave_meta_model::fragment},
47            {functions, risingwave_meta_model::function},
48            {indexes, risingwave_meta_model::index},
49            {objects, risingwave_meta_model::object},
50            {object_dependencies, risingwave_meta_model::object_dependency},
51            {schemas, risingwave_meta_model::schema},
52            {sinks, risingwave_meta_model::sink},
53            {sources, risingwave_meta_model::source},
54            {streaming_jobs, risingwave_meta_model::streaming_job},
55            {subscriptions, risingwave_meta_model::subscription},
56            {system_parameters, risingwave_meta_model::system_parameter},
57            {tables, risingwave_meta_model::table},
58            {users, risingwave_meta_model::user},
59            {user_privileges, risingwave_meta_model::user_privilege},
60            {views, risingwave_meta_model::view},
61            {workers, risingwave_meta_model::worker},
62            {worker_properties, risingwave_meta_model::worker_property},
63            {hummock_sequences, risingwave_meta_model::hummock_sequence},
64            {session_parameters, risingwave_meta_model::session_parameter},
65            {secrets, risingwave_meta_model::secret},
66            {exactly_once_iceberg_sinks, risingwave_meta_model::exactly_once_iceberg_sink},
67            {iceberg_tables, risingwave_meta_model::iceberg_tables},
68            {iceberg_namespace_properties, risingwave_meta_model::iceberg_namespace_properties},
69            {user_default_privilege, risingwave_meta_model::user_default_privilege},
70            {fragment_splits, risingwave_meta_model::fragment_splits},
71            {pending_sink_state, risingwave_meta_model::pending_sink_state},
72            {refresh_jobs, risingwave_meta_model::refresh_job},
73            {cdc_table_snapshot_splits, risingwave_meta_model::cdc_table_snapshot_split}
74        }
75    };
76}
77
78macro_rules! define_metadata_v2 {
79    ($({ $name:ident, $mod_path:ident::$mod_name:ident }),*) => {
80        #[derive(Default)]
81        pub struct MetadataV2 {
82            pub hummock_version: HummockVersion,
83            $(
84                pub $name: Vec<$mod_path::$mod_name::Model>,
85            )*
86        }
87    };
88}
89
90for_all_metadata_models_v2!(define_metadata_v2);
91
92macro_rules! define_encode_metadata {
93    ($( {$name:ident, $mod_path:ident::$mod_name:ident} ),*) => {
94        fn encode_metadata(
95          metadata: &MetadataV2,
96          buf: &mut Vec<u8>,
97        ) -> BackupResult<()> {
98            let mut _idx = 0;
99            $(
100                if _idx == 1 {
101                    put_1(buf, &metadata.hummock_version.to_protobuf())?;
102                }
103                put_n(buf, &metadata.$name)?;
104                _idx += 1;
105            )*
106            Ok(())
107        }
108    };
109}
110
111for_all_metadata_models_v2!(define_encode_metadata);
112
113macro_rules! define_decode_metadata {
114    ($( {$name:ident, $mod_path:ident::$mod_name:ident} ),*) => {
115        fn decode_metadata(
116          metadata: &mut MetadataV2,
117          mut buf: &[u8],
118        ) -> BackupResult<()> {
119            let mut _idx = 0;
120            $(
121                if _idx == 1 {
122                    metadata.hummock_version = HummockVersion::from_persisted_protobuf(&get_1(&mut buf)?);
123                }
124                metadata.$name = get_n(&mut buf)?;
125                _idx += 1;
126            )*
127            Ok(())
128        }
129    };
130}
131
132for_all_metadata_models_v2!(define_decode_metadata);
133
134impl Display for MetadataV2 {
135    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
136        writeln!(f, "clusters: {:#?}", self.clusters)?;
137        writeln!(
138            f,
139            "Hummock version: id {}, committed_epoch: {:?}",
140            self.hummock_version.id,
141            self.hummock_version.state_table_info.info(),
142        )?;
143        // optionally dump other metadata
144        Ok(())
145    }
146}
147
148impl Metadata for MetadataV2 {
149    fn encode_to(&self, buf: &mut Vec<u8>) -> BackupResult<()> {
150        encode_metadata(self, buf)
151    }
152
153    fn decode(buf: &[u8]) -> BackupResult<Self>
154    where
155        Self: Sized,
156    {
157        let mut metadata = Self::default();
158        decode_metadata(&mut metadata, buf)?;
159        Ok(metadata)
160    }
161
162    fn hummock_version_ref(&self) -> &HummockVersion {
163        &self.hummock_version
164    }
165
166    fn hummock_version(self) -> HummockVersion {
167        self.hummock_version
168    }
169
170    fn storage_url(&self) -> BackupResult<String> {
171        let storage_url_from_snapshot = self
172            .system_parameters
173            .iter()
174            .filter_map(|m| {
175                if m.name == "state_store" {
176                    return Some(m.value.clone());
177                }
178                None
179            })
180            .exactly_one()
181            .map_err(|_| BackupError::Other(anyhow!("expect state_store")))?;
182        storage_url_from_snapshot
183            .strip_prefix("hummock+")
184            .map(|s| s.to_owned())
185            .ok_or_else(|| {
186                BackupError::Other(anyhow!(
187                    "invalid state_store from metadata snapshot: {}",
188                    storage_url_from_snapshot
189                ))
190            })
191    }
192
193    fn storage_directory(&self) -> BackupResult<String> {
194        self.system_parameters
195            .iter()
196            .filter_map(|m| {
197                if m.name == "data_directory" {
198                    return Some(m.value.clone());
199                }
200                None
201            })
202            .exactly_one()
203            .map_err(|_| BackupError::Other(anyhow!("expect data_directory")))
204    }
205}
206
207fn put_n<T: Serialize>(buf: &mut Vec<u8>, data: &[T]) -> Result<(), serde_json::Error> {
208    buf.put_u32_le(
209        data.len()
210            .try_into()
211            .unwrap_or_else(|_| panic!("cannot convert {} into u32", data.len())),
212    );
213    for d in data {
214        put_with_len_prefix(buf, d)?;
215    }
216    Ok(())
217}
218
219fn put_1<T: Serialize>(buf: &mut Vec<u8>, data: &T) -> Result<(), serde_json::Error> {
220    put_n(buf, &[data])
221}
222
223fn get_n<'a, T: Deserialize<'a>>(buf: &mut &'a [u8]) -> Result<Vec<T>, serde_json::Error> {
224    let n = buf.get_u32_le() as usize;
225    let mut elements = Vec::with_capacity(n);
226    for _ in 0..n {
227        elements.push(get_with_len_prefix(buf)?);
228    }
229    Ok(elements)
230}
231
232fn get_1<'a, T: Deserialize<'a>>(buf: &mut &'a [u8]) -> Result<T, serde_json::Error> {
233    let elements = get_n(buf)?;
234    assert_eq!(elements.len(), 1);
235    Ok(elements.into_iter().next().unwrap())
236}
237
238fn put_with_len_prefix<T: Serialize>(buf: &mut Vec<u8>, data: &T) -> Result<(), serde_json::Error> {
239    let b = serde_json::to_vec(data)?;
240    buf.put_u32_le(
241        b.len()
242            .try_into()
243            .unwrap_or_else(|_| panic!("cannot convert {} into u32", b.len())),
244    );
245    buf.put_slice(&b);
246    Ok(())
247}
248
249fn get_with_len_prefix<'a, T: Deserialize<'a>>(buf: &mut &'a [u8]) -> Result<T, serde_json::Error> {
250    let len = buf.get_u32_le() as usize;
251    let d = serde_json::from_slice(&buf[..len])?;
252    buf.advance(len);
253    Ok(d)
254}