risingwave_backup/
meta_snapshot_v2.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::fmt::{Display, Formatter};
17
18use anyhow::anyhow;
19use bytes::{Buf, BufMut};
20use itertools::Itertools;
21use risingwave_hummock_sdk::HummockRawObjectId;
22use risingwave_hummock_sdk::change_log::EpochNewChangeLog;
23use risingwave_hummock_sdk::version::HummockVersion;
24use serde::{Deserialize, Serialize};
25
26use crate::meta_snapshot::{MetaSnapshot, Metadata};
27use crate::{BackupError, BackupResult};
28pub type MetaSnapshotV2 = MetaSnapshot<MetadataV2>;
29
30impl From<serde_json::Error> for BackupError {
31    fn from(value: serde_json::Error) -> Self {
32        BackupError::Other(value.into())
33    }
34}
35
36/// Add new item in the end. Do not change the order.
37#[macro_export]
38macro_rules! for_all_metadata_models_v2 {
39    ($macro:ident) => {
40        $macro! {
41            {seaql_migrations, risingwave_meta_model::serde_seaql_migration},
42            {version_stats, risingwave_meta_model::hummock_version_stats},
43            {compaction_configs, risingwave_meta_model::compaction_config},
44            {clusters, risingwave_meta_model::cluster},
45            {fragment_relation, risingwave_meta_model::fragment_relation},
46            {catalog_versions, risingwave_meta_model::catalog_version},
47            {connections, risingwave_meta_model::connection},
48            {databases, risingwave_meta_model::database},
49            {fragments, risingwave_meta_model::fragment},
50            {functions, risingwave_meta_model::function},
51            {indexes, risingwave_meta_model::index},
52            {objects, risingwave_meta_model::object},
53            {object_dependencies, risingwave_meta_model::object_dependency},
54            {schemas, risingwave_meta_model::schema},
55            {sinks, risingwave_meta_model::sink},
56            {sources, risingwave_meta_model::source},
57            {streaming_jobs, risingwave_meta_model::streaming_job},
58            {subscriptions, risingwave_meta_model::subscription},
59            {system_parameters, risingwave_meta_model::system_parameter},
60            {tables, risingwave_meta_model::table},
61            {users, risingwave_meta_model::user},
62            {user_privileges, risingwave_meta_model::user_privilege},
63            {views, risingwave_meta_model::view},
64            {workers, risingwave_meta_model::worker},
65            {worker_properties, risingwave_meta_model::worker_property},
66            {hummock_sequences, risingwave_meta_model::hummock_sequence},
67            {session_parameters, risingwave_meta_model::session_parameter},
68            {secrets, risingwave_meta_model::secret},
69            {exactly_once_iceberg_sinks, risingwave_meta_model::exactly_once_iceberg_sink},
70            {iceberg_tables, risingwave_meta_model::iceberg_tables},
71            {iceberg_namespace_properties, risingwave_meta_model::iceberg_namespace_properties},
72            {user_default_privilege, risingwave_meta_model::user_default_privilege},
73            {fragment_splits, risingwave_meta_model::fragment_splits},
74            {pending_sink_state, risingwave_meta_model::pending_sink_state},
75            {refresh_jobs, risingwave_meta_model::refresh_job},
76            {cdc_table_snapshot_splits, risingwave_meta_model::cdc_table_snapshot_split},
77            {hummock_table_change_logs, risingwave_meta_model::hummock_table_change_log}
78        }
79    };
80}
81
82macro_rules! define_metadata_v2 {
83    ($({ $name:ident, $mod_path:ident::$mod_name:ident }),*) => {
84        #[derive(Default)]
85        pub struct MetadataV2 {
86            pub hummock_version: HummockVersion,
87            $(
88                pub $name: Vec<$mod_path::$mod_name::Model>,
89            )*
90        }
91    };
92}
93
94for_all_metadata_models_v2!(define_metadata_v2);
95
96macro_rules! define_encode_metadata {
97    ($( {$name:ident, $mod_path:ident::$mod_name:ident} ),*) => {
98        fn encode_metadata(
99          metadata: &MetadataV2,
100          buf: &mut Vec<u8>,
101        ) -> BackupResult<()> {
102            let mut _idx = 0;
103            $(
104                if _idx == 1 {
105                    put_1(buf, &metadata.hummock_version.to_protobuf())?;
106                }
107                put_n(buf, &metadata.$name)?;
108                _idx += 1;
109            )*
110            Ok(())
111        }
112    };
113}
114
115for_all_metadata_models_v2!(define_encode_metadata);
116
117macro_rules! define_decode_metadata {
118    ($( {$name:ident, $mod_path:ident::$mod_name:ident} ),*) => {
119        fn decode_metadata(
120          metadata: &mut MetadataV2,
121          mut buf: &[u8],
122        ) -> BackupResult<()> {
123            let mut _idx = 0;
124            $(
125                if _idx == 1 {
126                    metadata.hummock_version = HummockVersion::from_persisted_protobuf_owned(get_1(&mut buf)?);
127                }
128                metadata.$name = get_n(&mut buf)?;
129                _idx += 1;
130            )*
131            Ok(())
132        }
133    };
134}
135
136for_all_metadata_models_v2!(define_decode_metadata);
137
138impl Display for MetadataV2 {
139    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
140        writeln!(f, "clusters: {:#?}", self.clusters)?;
141        writeln!(
142            f,
143            "Hummock version: id {}, committed_epoch: {:?}",
144            self.hummock_version.id,
145            self.hummock_version.state_table_info.info(),
146        )?;
147        // optionally dump other metadata
148        Ok(())
149    }
150}
151
152impl Metadata for MetadataV2 {
153    fn encode_to(&self, buf: &mut Vec<u8>) -> BackupResult<()> {
154        encode_metadata(self, buf)
155    }
156
157    fn decode(buf: &[u8]) -> BackupResult<Self>
158    where
159        Self: Sized,
160    {
161        let mut metadata = Self::default();
162        decode_metadata(&mut metadata, buf)?;
163        Ok(metadata)
164    }
165
166    fn hummock_version_ref(&self) -> &HummockVersion {
167        &self.hummock_version
168    }
169
170    fn hummock_version(self) -> HummockVersion {
171        self.hummock_version
172    }
173
174    fn storage_url(&self) -> BackupResult<String> {
175        let storage_url_from_snapshot = self
176            .system_parameters
177            .iter()
178            .filter_map(|m| {
179                if m.name == "state_store" {
180                    return Some(m.value.clone());
181                }
182                None
183            })
184            .exactly_one()
185            .map_err(|_| BackupError::Other(anyhow!("expect state_store")))?;
186        storage_url_from_snapshot
187            .strip_prefix("hummock+")
188            .map(|s| s.to_owned())
189            .ok_or_else(|| {
190                BackupError::Other(anyhow!(
191                    "invalid state_store from metadata snapshot: {}",
192                    storage_url_from_snapshot
193                ))
194            })
195    }
196
197    fn storage_directory(&self) -> BackupResult<String> {
198        self.system_parameters
199            .iter()
200            .filter_map(|m| {
201                if m.name == "data_directory" {
202                    return Some(m.value.clone());
203                }
204                None
205            })
206            .exactly_one()
207            .map_err(|_| BackupError::Other(anyhow!("expect data_directory")))
208    }
209
210    fn table_change_log_object_ids(&self) -> HashSet<HummockRawObjectId> {
211        self.hummock_table_change_logs
212            .iter()
213            .flat_map(|m| {
214                // We cannot use `change_log_ssts` here because `to_table_change_log` returns an owned value, not a reference.
215                let EpochNewChangeLog {
216                    new_value,
217                    old_value,
218                    ..
219                } = to_table_change_log(m);
220                new_value
221                    .into_iter()
222                    .chain(old_value)
223                    .map(|t| t.object_id.as_raw())
224            })
225            .collect()
226    }
227}
228
229fn to_table_change_log(
230    change_log: &risingwave_meta_model::hummock_table_change_log::Model,
231) -> EpochNewChangeLog {
232    EpochNewChangeLog {
233        new_value: change_log
234            .new_value_sst
235            .to_protobuf()
236            .into_iter()
237            .map(Into::into)
238            .collect(),
239        old_value: change_log
240            .old_value_sst
241            .to_protobuf()
242            .into_iter()
243            .map(Into::into)
244            .collect(),
245        non_checkpoint_epochs: change_log
246            .non_checkpoint_epochs
247            .0
248            .iter()
249            .map(|e| *e as _)
250            .collect(),
251        checkpoint_epoch: change_log.checkpoint_epoch as _,
252    }
253}
254
255fn put_n<T: Serialize>(buf: &mut Vec<u8>, data: &[T]) -> Result<(), serde_json::Error> {
256    buf.put_u32_le(
257        data.len()
258            .try_into()
259            .unwrap_or_else(|_| panic!("cannot convert {} into u32", data.len())),
260    );
261    for d in data {
262        put_with_len_prefix(buf, d)?;
263    }
264    Ok(())
265}
266
267fn put_1<T: Serialize>(buf: &mut Vec<u8>, data: &T) -> Result<(), serde_json::Error> {
268    put_n(buf, &[data])
269}
270
271fn get_n<'a, T: Deserialize<'a>>(buf: &mut &'a [u8]) -> Result<Vec<T>, serde_json::Error> {
272    let n = buf.get_u32_le() as usize;
273    let mut elements = Vec::with_capacity(n);
274    for _ in 0..n {
275        elements.push(get_with_len_prefix(buf)?);
276    }
277    Ok(elements)
278}
279
280fn get_1<'a, T: Deserialize<'a>>(buf: &mut &'a [u8]) -> Result<T, serde_json::Error> {
281    let elements = get_n(buf)?;
282    assert_eq!(elements.len(), 1);
283    Ok(elements.into_iter().next().unwrap())
284}
285
286fn put_with_len_prefix<T: Serialize>(buf: &mut Vec<u8>, data: &T) -> Result<(), serde_json::Error> {
287    let b = serde_json::to_vec(data)?;
288    buf.put_u32_le(
289        b.len()
290            .try_into()
291            .unwrap_or_else(|_| panic!("cannot convert {} into u32", b.len())),
292    );
293    buf.put_slice(&b);
294    Ok(())
295}
296
297fn get_with_len_prefix<'a, T: Deserialize<'a>>(buf: &mut &'a [u8]) -> Result<T, serde_json::Error> {
298    let len = buf.get_u32_le() as usize;
299    let d = serde_json::from_slice(&buf[..len])?;
300    buf.advance(len);
301    Ok(d)
302}