1use std::collections::HashSet;
16use std::fmt::{Display, Formatter};
17
18use anyhow::anyhow;
19use bytes::{Buf, BufMut};
20use itertools::Itertools;
21use risingwave_hummock_sdk::HummockRawObjectId;
22use risingwave_hummock_sdk::change_log::EpochNewChangeLog;
23use risingwave_hummock_sdk::version::HummockVersion;
24use serde::{Deserialize, Serialize};
25
26use crate::meta_snapshot::{MetaSnapshot, Metadata};
27use crate::{BackupError, BackupResult};
28pub type MetaSnapshotV2 = MetaSnapshot<MetadataV2>;
29
30impl From<serde_json::Error> for BackupError {
31 fn from(value: serde_json::Error) -> Self {
32 BackupError::Other(value.into())
33 }
34}
35
36#[macro_export]
38macro_rules! for_all_metadata_models_v2 {
39 ($macro:ident) => {
40 $macro! {
41 {seaql_migrations, risingwave_meta_model::serde_seaql_migration},
42 {version_stats, risingwave_meta_model::hummock_version_stats},
43 {compaction_configs, risingwave_meta_model::compaction_config},
44 {clusters, risingwave_meta_model::cluster},
45 {fragment_relation, risingwave_meta_model::fragment_relation},
46 {catalog_versions, risingwave_meta_model::catalog_version},
47 {connections, risingwave_meta_model::connection},
48 {databases, risingwave_meta_model::database},
49 {fragments, risingwave_meta_model::fragment},
50 {functions, risingwave_meta_model::function},
51 {indexes, risingwave_meta_model::index},
52 {objects, risingwave_meta_model::object},
53 {object_dependencies, risingwave_meta_model::object_dependency},
54 {schemas, risingwave_meta_model::schema},
55 {sinks, risingwave_meta_model::sink},
56 {sources, risingwave_meta_model::source},
57 {streaming_jobs, risingwave_meta_model::streaming_job},
58 {subscriptions, risingwave_meta_model::subscription},
59 {system_parameters, risingwave_meta_model::system_parameter},
60 {tables, risingwave_meta_model::table},
61 {users, risingwave_meta_model::user},
62 {user_privileges, risingwave_meta_model::user_privilege},
63 {views, risingwave_meta_model::view},
64 {workers, risingwave_meta_model::worker},
65 {worker_properties, risingwave_meta_model::worker_property},
66 {hummock_sequences, risingwave_meta_model::hummock_sequence},
67 {session_parameters, risingwave_meta_model::session_parameter},
68 {secrets, risingwave_meta_model::secret},
69 {exactly_once_iceberg_sinks, risingwave_meta_model::exactly_once_iceberg_sink},
70 {iceberg_tables, risingwave_meta_model::iceberg_tables},
71 {iceberg_namespace_properties, risingwave_meta_model::iceberg_namespace_properties},
72 {user_default_privilege, risingwave_meta_model::user_default_privilege},
73 {fragment_splits, risingwave_meta_model::fragment_splits},
74 {pending_sink_state, risingwave_meta_model::pending_sink_state},
75 {refresh_jobs, risingwave_meta_model::refresh_job},
76 {cdc_table_snapshot_splits, risingwave_meta_model::cdc_table_snapshot_split},
77 {hummock_table_change_logs, risingwave_meta_model::hummock_table_change_log}
78 }
79 };
80}
81
82macro_rules! define_metadata_v2 {
83 ($({ $name:ident, $mod_path:ident::$mod_name:ident }),*) => {
84 #[derive(Default)]
85 pub struct MetadataV2 {
86 pub hummock_version: HummockVersion,
87 $(
88 pub $name: Vec<$mod_path::$mod_name::Model>,
89 )*
90 }
91 };
92}
93
94for_all_metadata_models_v2!(define_metadata_v2);
95
96macro_rules! define_encode_metadata {
97 ($( {$name:ident, $mod_path:ident::$mod_name:ident} ),*) => {
98 fn encode_metadata(
99 metadata: &MetadataV2,
100 buf: &mut Vec<u8>,
101 ) -> BackupResult<()> {
102 let mut _idx = 0;
103 $(
104 if _idx == 1 {
105 put_1(buf, &metadata.hummock_version.to_protobuf())?;
106 }
107 put_n(buf, &metadata.$name)?;
108 _idx += 1;
109 )*
110 Ok(())
111 }
112 };
113}
114
115for_all_metadata_models_v2!(define_encode_metadata);
116
117macro_rules! define_decode_metadata {
118 ($( {$name:ident, $mod_path:ident::$mod_name:ident} ),*) => {
119 fn decode_metadata(
120 metadata: &mut MetadataV2,
121 mut buf: &[u8],
122 ) -> BackupResult<()> {
123 let mut _idx = 0;
124 $(
125 if _idx == 1 {
126 metadata.hummock_version = HummockVersion::from_persisted_protobuf_owned(get_1(&mut buf)?);
127 }
128 metadata.$name = get_n(&mut buf)?;
129 _idx += 1;
130 )*
131 Ok(())
132 }
133 };
134}
135
136for_all_metadata_models_v2!(define_decode_metadata);
137
138impl Display for MetadataV2 {
139 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
140 writeln!(f, "clusters: {:#?}", self.clusters)?;
141 writeln!(
142 f,
143 "Hummock version: id {}, committed_epoch: {:?}",
144 self.hummock_version.id,
145 self.hummock_version.state_table_info.info(),
146 )?;
147 Ok(())
149 }
150}
151
152impl Metadata for MetadataV2 {
153 fn encode_to(&self, buf: &mut Vec<u8>) -> BackupResult<()> {
154 encode_metadata(self, buf)
155 }
156
157 fn decode(buf: &[u8]) -> BackupResult<Self>
158 where
159 Self: Sized,
160 {
161 let mut metadata = Self::default();
162 decode_metadata(&mut metadata, buf)?;
163 Ok(metadata)
164 }
165
166 fn hummock_version_ref(&self) -> &HummockVersion {
167 &self.hummock_version
168 }
169
170 fn hummock_version(self) -> HummockVersion {
171 self.hummock_version
172 }
173
174 fn storage_url(&self) -> BackupResult<String> {
175 let storage_url_from_snapshot = self
176 .system_parameters
177 .iter()
178 .filter_map(|m| {
179 if m.name == "state_store" {
180 return Some(m.value.clone());
181 }
182 None
183 })
184 .exactly_one()
185 .map_err(|_| BackupError::Other(anyhow!("expect state_store")))?;
186 storage_url_from_snapshot
187 .strip_prefix("hummock+")
188 .map(|s| s.to_owned())
189 .ok_or_else(|| {
190 BackupError::Other(anyhow!(
191 "invalid state_store from metadata snapshot: {}",
192 storage_url_from_snapshot
193 ))
194 })
195 }
196
197 fn storage_directory(&self) -> BackupResult<String> {
198 self.system_parameters
199 .iter()
200 .filter_map(|m| {
201 if m.name == "data_directory" {
202 return Some(m.value.clone());
203 }
204 None
205 })
206 .exactly_one()
207 .map_err(|_| BackupError::Other(anyhow!("expect data_directory")))
208 }
209
210 fn table_change_log_object_ids(&self) -> HashSet<HummockRawObjectId> {
211 self.hummock_table_change_logs
212 .iter()
213 .flat_map(|m| {
214 let EpochNewChangeLog {
216 new_value,
217 old_value,
218 ..
219 } = to_table_change_log(m);
220 new_value
221 .into_iter()
222 .chain(old_value)
223 .map(|t| t.object_id.as_raw())
224 })
225 .collect()
226 }
227}
228
229fn to_table_change_log(
230 change_log: &risingwave_meta_model::hummock_table_change_log::Model,
231) -> EpochNewChangeLog {
232 EpochNewChangeLog {
233 new_value: change_log
234 .new_value_sst
235 .to_protobuf()
236 .into_iter()
237 .map(Into::into)
238 .collect(),
239 old_value: change_log
240 .old_value_sst
241 .to_protobuf()
242 .into_iter()
243 .map(Into::into)
244 .collect(),
245 non_checkpoint_epochs: change_log
246 .non_checkpoint_epochs
247 .0
248 .iter()
249 .map(|e| *e as _)
250 .collect(),
251 checkpoint_epoch: change_log.checkpoint_epoch as _,
252 }
253}
254
255fn put_n<T: Serialize>(buf: &mut Vec<u8>, data: &[T]) -> Result<(), serde_json::Error> {
256 buf.put_u32_le(
257 data.len()
258 .try_into()
259 .unwrap_or_else(|_| panic!("cannot convert {} into u32", data.len())),
260 );
261 for d in data {
262 put_with_len_prefix(buf, d)?;
263 }
264 Ok(())
265}
266
267fn put_1<T: Serialize>(buf: &mut Vec<u8>, data: &T) -> Result<(), serde_json::Error> {
268 put_n(buf, &[data])
269}
270
271fn get_n<'a, T: Deserialize<'a>>(buf: &mut &'a [u8]) -> Result<Vec<T>, serde_json::Error> {
272 let n = buf.get_u32_le() as usize;
273 let mut elements = Vec::with_capacity(n);
274 for _ in 0..n {
275 elements.push(get_with_len_prefix(buf)?);
276 }
277 Ok(elements)
278}
279
280fn get_1<'a, T: Deserialize<'a>>(buf: &mut &'a [u8]) -> Result<T, serde_json::Error> {
281 let elements = get_n(buf)?;
282 assert_eq!(elements.len(), 1);
283 Ok(elements.into_iter().next().unwrap())
284}
285
286fn put_with_len_prefix<T: Serialize>(buf: &mut Vec<u8>, data: &T) -> Result<(), serde_json::Error> {
287 let b = serde_json::to_vec(data)?;
288 buf.put_u32_le(
289 b.len()
290 .try_into()
291 .unwrap_or_else(|_| panic!("cannot convert {} into u32", b.len())),
292 );
293 buf.put_slice(&b);
294 Ok(())
295}
296
297fn get_with_len_prefix<'a, T: Deserialize<'a>>(buf: &mut &'a [u8]) -> Result<T, serde_json::Error> {
298 let len = buf.get_u32_le() as usize;
299 let d = serde_json::from_slice(&buf[..len])?;
300 buf.advance(len);
301 Ok(d)
302}