1use std::fmt::{Display, Formatter};
16
17use bytes::{Buf, BufMut};
18use risingwave_hummock_sdk::version::HummockVersion;
19use serde::{Deserialize, Serialize};
20
21use crate::meta_snapshot::{MetaSnapshot, Metadata};
22use crate::{BackupError, BackupResult};
23pub type MetaSnapshotV2 = MetaSnapshot<MetadataV2>;
24
25impl From<serde_json::Error> for BackupError {
26 fn from(value: serde_json::Error) -> Self {
27 BackupError::Other(value.into())
28 }
29}
30
31#[macro_export]
33macro_rules! for_all_metadata_models_v2 {
34 ($macro:ident) => {
35 $macro! {
36 {seaql_migrations, risingwave_meta_model::serde_seaql_migration},
37 {version_stats, risingwave_meta_model::hummock_version_stats},
38 {compaction_configs, risingwave_meta_model::compaction_config},
39 {clusters, risingwave_meta_model::cluster},
40 {fragment_relation, risingwave_meta_model::fragment_relation},
41 {catalog_versions, risingwave_meta_model::catalog_version},
42 {connections, risingwave_meta_model::connection},
43 {databases, risingwave_meta_model::database},
44 {fragments, risingwave_meta_model::fragment},
45 {functions, risingwave_meta_model::function},
46 {indexes, risingwave_meta_model::index},
47 {objects, risingwave_meta_model::object},
48 {object_dependencies, risingwave_meta_model::object_dependency},
49 {schemas, risingwave_meta_model::schema},
50 {sinks, risingwave_meta_model::sink},
51 {sources, risingwave_meta_model::source},
52 {streaming_jobs, risingwave_meta_model::streaming_job},
53 {subscriptions, risingwave_meta_model::subscription},
54 {system_parameters, risingwave_meta_model::system_parameter},
55 {tables, risingwave_meta_model::table},
56 {users, risingwave_meta_model::user},
57 {user_privileges, risingwave_meta_model::user_privilege},
58 {views, risingwave_meta_model::view},
59 {workers, risingwave_meta_model::worker},
60 {worker_properties, risingwave_meta_model::worker_property},
61 {hummock_sequences, risingwave_meta_model::hummock_sequence},
62 {session_parameters, risingwave_meta_model::session_parameter},
63 {secrets, risingwave_meta_model::secret},
64 {exactly_once_iceberg_sinks, risingwave_meta_model::exactly_once_iceberg_sink},
65 {iceberg_tables, risingwave_meta_model::iceberg_tables},
66 {iceberg_namespace_properties, risingwave_meta_model::iceberg_namespace_properties},
67 {user_default_privilege, risingwave_meta_model::user_default_privilege},
68 {fragment_splits, risingwave_meta_model::fragment_splits},
69 {pending_sink_state, risingwave_meta_model::pending_sink_state},
70 {refresh_jobs, risingwave_meta_model::refresh_job}
71 }
72 };
73}
74
75macro_rules! define_metadata_v2 {
76 ($({ $name:ident, $mod_path:ident::$mod_name:ident }),*) => {
77 #[derive(Default)]
78 pub struct MetadataV2 {
79 pub hummock_version: HummockVersion,
80 $(
81 pub $name: Vec<$mod_path::$mod_name::Model>,
82 )*
83 }
84 };
85}
86
87for_all_metadata_models_v2!(define_metadata_v2);
88
89macro_rules! define_encode_metadata {
90 ($( {$name:ident, $mod_path:ident::$mod_name:ident} ),*) => {
91 fn encode_metadata(
92 metadata: &MetadataV2,
93 buf: &mut Vec<u8>,
94 ) -> BackupResult<()> {
95 let mut _idx = 0;
96 $(
97 if _idx == 1 {
98 put_1(buf, &metadata.hummock_version.to_protobuf())?;
99 }
100 put_n(buf, &metadata.$name)?;
101 _idx += 1;
102 )*
103 Ok(())
104 }
105 };
106}
107
108for_all_metadata_models_v2!(define_encode_metadata);
109
110macro_rules! define_decode_metadata {
111 ($( {$name:ident, $mod_path:ident::$mod_name:ident} ),*) => {
112 fn decode_metadata(
113 metadata: &mut MetadataV2,
114 mut buf: &[u8],
115 ) -> BackupResult<()> {
116 let mut _idx = 0;
117 $(
118 if _idx == 1 {
119 metadata.hummock_version = HummockVersion::from_persisted_protobuf(&get_1(&mut buf)?);
120 }
121 metadata.$name = get_n(&mut buf)?;
122 _idx += 1;
123 )*
124 Ok(())
125 }
126 };
127}
128
129for_all_metadata_models_v2!(define_decode_metadata);
130
131impl Display for MetadataV2 {
132 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
133 writeln!(f, "clusters: {:#?}", self.clusters)?;
134 writeln!(
135 f,
136 "Hummock version: id {}, committed_epoch: {:?}",
137 self.hummock_version.id,
138 self.hummock_version.state_table_info.info(),
139 )?;
140 Ok(())
142 }
143}
144
145impl Metadata for MetadataV2 {
146 fn encode_to(&self, buf: &mut Vec<u8>) -> BackupResult<()> {
147 encode_metadata(self, buf)
148 }
149
150 fn decode(buf: &[u8]) -> BackupResult<Self>
151 where
152 Self: Sized,
153 {
154 let mut metadata = Self::default();
155 decode_metadata(&mut metadata, buf)?;
156 Ok(metadata)
157 }
158
159 fn hummock_version_ref(&self) -> &HummockVersion {
160 &self.hummock_version
161 }
162
163 fn hummock_version(self) -> HummockVersion {
164 self.hummock_version
165 }
166}
167
168fn put_n<T: Serialize>(buf: &mut Vec<u8>, data: &[T]) -> Result<(), serde_json::Error> {
169 buf.put_u32_le(
170 data.len()
171 .try_into()
172 .unwrap_or_else(|_| panic!("cannot convert {} into u32", data.len())),
173 );
174 for d in data {
175 put_with_len_prefix(buf, d)?;
176 }
177 Ok(())
178}
179
180fn put_1<T: Serialize>(buf: &mut Vec<u8>, data: &T) -> Result<(), serde_json::Error> {
181 put_n(buf, &[data])
182}
183
184fn get_n<'a, T: Deserialize<'a>>(buf: &mut &'a [u8]) -> Result<Vec<T>, serde_json::Error> {
185 let n = buf.get_u32_le() as usize;
186 let mut elements = Vec::with_capacity(n);
187 for _ in 0..n {
188 elements.push(get_with_len_prefix(buf)?);
189 }
190 Ok(elements)
191}
192
193fn get_1<'a, T: Deserialize<'a>>(buf: &mut &'a [u8]) -> Result<T, serde_json::Error> {
194 let elements = get_n(buf)?;
195 assert_eq!(elements.len(), 1);
196 Ok(elements.into_iter().next().unwrap())
197}
198
199fn put_with_len_prefix<T: Serialize>(buf: &mut Vec<u8>, data: &T) -> Result<(), serde_json::Error> {
200 let b = serde_json::to_vec(data)?;
201 buf.put_u32_le(
202 b.len()
203 .try_into()
204 .unwrap_or_else(|_| panic!("cannot convert {} into u32", b.len())),
205 );
206 buf.put_slice(&b);
207 Ok(())
208}
209
210fn get_with_len_prefix<'a, T: Deserialize<'a>>(buf: &mut &'a [u8]) -> Result<T, serde_json::Error> {
211 let len = buf.get_u32_le() as usize;
212 let d = serde_json::from_slice(&buf[..len])?;
213 buf.advance(len);
214 Ok(d)
215}