Skip to main content

risingwave_backup/
meta_snapshot_v2.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::fmt::{Display, Formatter};
17
18use anyhow::anyhow;
19use bytes::{Buf, BufMut};
20use itertools::Itertools;
21use risingwave_hummock_sdk::HummockRawObjectId;
22use risingwave_hummock_sdk::change_log::EpochNewChangeLog;
23use risingwave_hummock_sdk::version::HummockVersion;
24use serde::{Deserialize, Serialize};
25
26use crate::meta_snapshot::{MetaSnapshot, Metadata};
27use crate::{BackupError, BackupResult};
28pub type MetaSnapshotV2 = MetaSnapshot<MetadataV2>;
29
30impl From<serde_json::Error> for BackupError {
31    fn from(value: serde_json::Error) -> Self {
32        BackupError::Other(value.into())
33    }
34}
35
36/// Add new item in the end. Do not change the order.
37#[macro_export]
38macro_rules! for_all_metadata_models_v2 {
39    ($macro:ident) => {
40        $macro! {
41            {seaql_migrations, risingwave_meta_model::serde_seaql_migration},
42            {version_stats, risingwave_meta_model::hummock_version_stats},
43            {compaction_configs, risingwave_meta_model::compaction_config},
44            {clusters, risingwave_meta_model::cluster},
45            {fragment_relation, risingwave_meta_model::fragment_relation},
46            {catalog_versions, risingwave_meta_model::catalog_version},
47            {connections, risingwave_meta_model::connection},
48            {databases, risingwave_meta_model::database},
49            {fragments, risingwave_meta_model::fragment},
50            {functions, risingwave_meta_model::function},
51            {indexes, risingwave_meta_model::index},
52            {objects, risingwave_meta_model::object},
53            {object_dependencies, risingwave_meta_model::object_dependency},
54            {schemas, risingwave_meta_model::schema},
55            {sinks, risingwave_meta_model::sink},
56            {sources, risingwave_meta_model::source},
57            {streaming_jobs, risingwave_meta_model::streaming_job},
58            {subscriptions, risingwave_meta_model::subscription},
59            {system_parameters, risingwave_meta_model::system_parameter},
60            {tables, risingwave_meta_model::table},
61            {users, risingwave_meta_model::user},
62            {user_privileges, risingwave_meta_model::user_privilege},
63            {views, risingwave_meta_model::view},
64            {workers, risingwave_meta_model::worker},
65            {worker_properties, risingwave_meta_model::worker_property},
66            {hummock_sequences, risingwave_meta_model::hummock_sequence},
67            {session_parameters, risingwave_meta_model::session_parameter},
68            {secrets, risingwave_meta_model::secret},
69            {exactly_once_iceberg_sinks, risingwave_meta_model::exactly_once_iceberg_sink},
70            {iceberg_tables, risingwave_meta_model::iceberg_tables},
71            {iceberg_namespace_properties, risingwave_meta_model::iceberg_namespace_properties},
72            {user_default_privilege, risingwave_meta_model::user_default_privilege},
73            {fragment_splits, risingwave_meta_model::fragment_splits},
74            {pending_sink_state, risingwave_meta_model::pending_sink_state},
75            {refresh_jobs, risingwave_meta_model::refresh_job},
76            {cdc_table_snapshot_splits, risingwave_meta_model::cdc_table_snapshot_split},
77            {hummock_table_change_logs, risingwave_meta_model::hummock_table_change_log}
78        }
79    };
80}
81
82macro_rules! define_metadata_v2 {
83    ($({ $name:ident, $mod_path:ident::$mod_name:ident }),*) => {
84        #[derive(Default)]
85        pub struct MetadataV2 {
86            pub hummock_version: HummockVersion,
87            $(
88                pub $name: Vec<$mod_path::$mod_name::Model>,
89            )*
90        }
91    };
92}
93
94for_all_metadata_models_v2!(define_metadata_v2);
95
96macro_rules! define_encode_metadata {
97    ($( {$name:ident, $mod_path:ident::$mod_name:ident} ),*) => {
98        fn encode_metadata(
99          metadata: &MetadataV2,
100          buf: &mut Vec<u8>,
101        ) -> BackupResult<()> {
102            let mut _idx = 0;
103            $(
104                if _idx == 1 {
105                    put_1(buf, &metadata.hummock_version.to_protobuf())?;
106                }
107                put_n(buf, &metadata.$name)?;
108                _idx += 1;
109            )*
110            Ok(())
111        }
112    };
113}
114
115for_all_metadata_models_v2!(define_encode_metadata);
116
117macro_rules! define_decode_metadata {
118    ($( {$name:ident, $mod_path:ident::$mod_name:ident} ),*) => {
119        fn decode_metadata(
120          metadata: &mut MetadataV2,
121          mut buf: &[u8],
122        ) -> BackupResult<()> {
123            let mut _idx = 0;
124            $(
125                if _idx == 1 {
126                    metadata.hummock_version = HummockVersion::from_persisted_protobuf_owned(get_1(&mut buf)?);
127                }
128                metadata.$name = get_n(&mut buf)?;
129                _idx += 1;
130            )*
131            Ok(())
132        }
133    };
134}
135
136for_all_metadata_models_v2!(define_decode_metadata);
137
138impl Display for MetadataV2 {
139    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
140        writeln!(f, "clusters: {:#?}", self.clusters)?;
141        writeln!(
142            f,
143            "Hummock version: id {}, committed_epoch: {:?}",
144            self.hummock_version.id,
145            self.hummock_version.state_table_info.info(),
146        )?;
147        // optionally dump other metadata
148        Ok(())
149    }
150}
151
152impl Metadata for MetadataV2 {
153    fn encode_to(&self, buf: &mut Vec<u8>) -> BackupResult<()> {
154        encode_metadata(self, buf)
155    }
156
157    fn decode(buf: &[u8]) -> BackupResult<Self>
158    where
159        Self: Sized,
160    {
161        let mut metadata = Self::default();
162        decode_metadata(&mut metadata, buf)?;
163        Ok(metadata)
164    }
165
166    fn hummock_version_ref(&self) -> &HummockVersion {
167        &self.hummock_version
168    }
169
170    fn hummock_version(self) -> HummockVersion {
171        self.hummock_version
172    }
173
174    fn storage_url(&self) -> BackupResult<String> {
175        let storage_url_from_snapshot =
176            Itertools::exactly_one(self.system_parameters.iter().filter_map(|m| {
177                if m.name == "state_store" {
178                    return Some(m.value.clone());
179                }
180                None
181            }))
182            .map_err(|_| BackupError::Other(anyhow!("expect state_store")))?;
183        storage_url_from_snapshot
184            .strip_prefix("hummock+")
185            .map(|s| s.to_owned())
186            .ok_or_else(|| {
187                BackupError::Other(anyhow!(
188                    "invalid state_store from metadata snapshot: {}",
189                    storage_url_from_snapshot
190                ))
191            })
192    }
193
194    fn storage_directory(&self) -> BackupResult<String> {
195        Itertools::exactly_one(self.system_parameters.iter().filter_map(|m| {
196            if m.name == "data_directory" {
197                return Some(m.value.clone());
198            }
199            None
200        }))
201        .map_err(|_| BackupError::Other(anyhow!("expect data_directory")))
202    }
203
204    fn table_change_log_object_ids(&self) -> HashSet<HummockRawObjectId> {
205        self.hummock_table_change_logs
206            .iter()
207            .flat_map(|m| {
208                // We cannot use `change_log_ssts` here because `to_table_change_log` returns an owned value, not a reference.
209                let EpochNewChangeLog {
210                    new_value,
211                    old_value,
212                    ..
213                } = to_table_change_log(m);
214                new_value
215                    .into_iter()
216                    .chain(old_value)
217                    .map(|t| t.object_id.as_raw())
218            })
219            .collect()
220    }
221}
222
223fn to_table_change_log(
224    change_log: &risingwave_meta_model::hummock_table_change_log::Model,
225) -> EpochNewChangeLog {
226    EpochNewChangeLog {
227        new_value: change_log
228            .new_value_sst
229            .to_protobuf()
230            .into_iter()
231            .map(Into::into)
232            .collect(),
233        old_value: change_log
234            .old_value_sst
235            .to_protobuf()
236            .into_iter()
237            .map(Into::into)
238            .collect(),
239        non_checkpoint_epochs: change_log
240            .non_checkpoint_epochs
241            .0
242            .iter()
243            .map(|e| *e as _)
244            .collect(),
245        checkpoint_epoch: change_log.checkpoint_epoch as _,
246    }
247}
248
249fn put_n<T: Serialize>(buf: &mut Vec<u8>, data: &[T]) -> Result<(), serde_json::Error> {
250    buf.put_u32_le(
251        data.len()
252            .try_into()
253            .unwrap_or_else(|_| panic!("cannot convert {} into u32", data.len())),
254    );
255    for d in data {
256        put_with_len_prefix(buf, d)?;
257    }
258    Ok(())
259}
260
261fn put_1<T: Serialize>(buf: &mut Vec<u8>, data: &T) -> Result<(), serde_json::Error> {
262    put_n(buf, &[data])
263}
264
265fn get_n<'a, T: Deserialize<'a>>(buf: &mut &'a [u8]) -> Result<Vec<T>, serde_json::Error> {
266    let n = buf.get_u32_le() as usize;
267    let mut elements = Vec::with_capacity(n);
268    for _ in 0..n {
269        elements.push(get_with_len_prefix(buf)?);
270    }
271    Ok(elements)
272}
273
274fn get_1<'a, T: Deserialize<'a>>(buf: &mut &'a [u8]) -> Result<T, serde_json::Error> {
275    let elements = get_n(buf)?;
276    assert_eq!(elements.len(), 1);
277    Ok(elements.into_iter().next().unwrap())
278}
279
280fn put_with_len_prefix<T: Serialize>(buf: &mut Vec<u8>, data: &T) -> Result<(), serde_json::Error> {
281    let b = serde_json::to_vec(data)?;
282    buf.put_u32_le(
283        b.len()
284            .try_into()
285            .unwrap_or_else(|_| panic!("cannot convert {} into u32", b.len())),
286    );
287    buf.put_slice(&b);
288    Ok(())
289}
290
291fn get_with_len_prefix<'a, T: Deserialize<'a>>(buf: &mut &'a [u8]) -> Result<T, serde_json::Error> {
292    let len = buf.get_u32_le() as usize;
293    let d = serde_json::from_slice(&buf[..len])?;
294    buf.advance(len);
295    Ok(d)
296}