risingwave_meta/hummock/manager/
transaction.rs1use std::collections::{BTreeMap, HashMap};
16use std::ops::{Deref, DerefMut};
17
18use parking_lot::Mutex;
19use risingwave_common::catalog::TableId;
20use risingwave_hummock_sdk::change_log::ChangeLogDelta;
21use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
22use risingwave_hummock_sdk::sstable_info::SstableInfo;
23use risingwave_hummock_sdk::table_watermark::TableWatermarks;
24use risingwave_hummock_sdk::version::{GroupDelta, HummockVersion, HummockVersionDelta};
25use risingwave_hummock_sdk::{CompactionGroupId, FrontendHummockVersionDelta, HummockVersionId};
26use risingwave_pb::hummock::{
27 CompatibilityVersion, GroupConstruct, HummockVersionDeltas, HummockVersionStats,
28 StateTableInfoDelta,
29};
30use risingwave_pb::meta::subscribe_response::{Info, Operation};
31
32use super::TableCommittedEpochNotifiers;
33use crate::hummock::model::CompactionGroup;
34use crate::manager::NotificationManager;
35use crate::model::{
36 InMemValTransaction, MetadataModelResult, Transactional, ValTransaction, VarTransaction,
37};
38use crate::rpc::metrics::MetaMetrics;
39
40fn trigger_delta_log_stats(metrics: &MetaMetrics, total_number: usize) {
41 metrics.delta_log_count.set(total_number as _);
42}
43
44fn trigger_version_stat(metrics: &MetaMetrics, current_version: &HummockVersion) {
45 metrics
46 .version_size
47 .set(current_version.estimated_encode_len() as i64);
48 metrics
49 .current_version_id
50 .set(current_version.id.to_u64() as i64);
51}
52
53pub(super) struct HummockVersionTransaction<'a> {
54 orig_version: &'a mut HummockVersion,
55 orig_deltas: &'a mut BTreeMap<HummockVersionId, HummockVersionDelta>,
56 notification_manager: &'a NotificationManager,
57 table_committed_epoch_notifiers: Option<&'a Mutex<TableCommittedEpochNotifiers>>,
58 meta_metrics: &'a MetaMetrics,
59
60 pre_applied_version: Option<(HummockVersion, Vec<HummockVersionDelta>)>,
61 disable_apply_to_txn: bool,
62}
63
64impl<'a> HummockVersionTransaction<'a> {
65 pub(super) fn new(
66 version: &'a mut HummockVersion,
67 deltas: &'a mut BTreeMap<HummockVersionId, HummockVersionDelta>,
68 notification_manager: &'a NotificationManager,
69 table_committed_epoch_notifiers: Option<&'a Mutex<TableCommittedEpochNotifiers>>,
70 meta_metrics: &'a MetaMetrics,
71 ) -> Self {
72 Self {
73 orig_version: version,
74 orig_deltas: deltas,
75 pre_applied_version: None,
76 disable_apply_to_txn: false,
77 notification_manager,
78 table_committed_epoch_notifiers,
79 meta_metrics,
80 }
81 }
82
83 pub(super) fn disable_apply_to_txn(&mut self) {
84 assert!(
85 self.pre_applied_version.is_none(),
86 "should only call disable at the beginning of txn"
87 );
88 self.disable_apply_to_txn = true;
89 }
90
91 pub(super) fn latest_version(&self) -> &HummockVersion {
92 if let Some((version, _)) = &self.pre_applied_version {
93 version
94 } else {
95 self.orig_version
96 }
97 }
98
99 pub(super) fn new_delta<'b>(&'b mut self) -> SingleDeltaTransaction<'a, 'b> {
100 let delta = self.latest_version().version_delta_after();
101 SingleDeltaTransaction {
102 version_txn: self,
103 delta: Some(delta),
104 }
105 }
106
107 fn pre_apply(&mut self, delta: HummockVersionDelta) {
108 let (version, deltas) = self
109 .pre_applied_version
110 .get_or_insert_with(|| (self.orig_version.clone(), Vec::with_capacity(1)));
111 version.apply_version_delta(&delta);
112 deltas.push(delta);
113 }
114
115 pub(super) fn pre_commit_epoch(
117 &mut self,
118 tables_to_commit: &HashMap<TableId, u64>,
119 new_compaction_groups: Vec<CompactionGroup>,
120 group_id_to_sub_levels: BTreeMap<CompactionGroupId, Vec<Vec<SstableInfo>>>,
121 new_table_ids: &HashMap<TableId, CompactionGroupId>,
122 new_table_watermarks: HashMap<TableId, TableWatermarks>,
123 change_log_delta: HashMap<TableId, ChangeLogDelta>,
124 ) -> HummockVersionDelta {
125 let mut new_version_delta = self.new_delta();
126 new_version_delta.new_table_watermarks = new_table_watermarks;
127 new_version_delta.change_log_delta = change_log_delta;
128
129 for compaction_group in &new_compaction_groups {
130 let group_deltas = &mut new_version_delta
131 .group_deltas
132 .entry(compaction_group.group_id())
133 .or_default()
134 .group_deltas;
135
136 #[expect(deprecated)]
137 group_deltas.push(GroupDelta::GroupConstruct(Box::new(GroupConstruct {
138 group_config: Some(compaction_group.compaction_config().as_ref().clone()),
139 group_id: compaction_group.group_id(),
140 parent_group_id: StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId,
141 new_sst_start_id: 0, table_ids: vec![],
143 version: CompatibilityVersion::LATEST as _,
144 split_key: None,
145 })));
146 }
147
148 for (compaction_group_id, sub_levels) in group_id_to_sub_levels {
150 let group_deltas = &mut new_version_delta
151 .group_deltas
152 .entry(compaction_group_id)
153 .or_default()
154 .group_deltas;
155
156 for sub_level in sub_levels {
157 group_deltas.push(GroupDelta::NewL0SubLevel(sub_level));
158 }
159 }
160
161 new_version_delta.with_latest_version(|version, delta| {
163 for (table_id, cg_id) in new_table_ids {
164 assert!(
165 !version.state_table_info.info().contains_key(table_id),
166 "newly added table exists previously: {:?}",
167 table_id
168 );
169 let committed_epoch = *tables_to_commit.get(table_id).expect("newly added table must exist in tables_to_commit");
170 delta.state_table_info_delta.insert(
171 *table_id,
172 StateTableInfoDelta {
173 committed_epoch,
174 compaction_group_id: *cg_id,
175 },
176 );
177 }
178
179 for (table_id, committed_epoch) in tables_to_commit {
180 if new_table_ids.contains_key(table_id) {
181 continue;
182 }
183 let info = version.state_table_info.info().get(table_id).unwrap_or_else(|| {
184 panic!("tables_to_commit {:?} contains table_id {} that is not newly added but not exists previously", tables_to_commit, table_id);
185 });
186 assert!(delta
187 .state_table_info_delta
188 .insert(
189 *table_id,
190 StateTableInfoDelta {
191 committed_epoch: *committed_epoch,
192 compaction_group_id: info.compaction_group_id,
193 }
194 )
195 .is_none());
196 }
197 });
198
199 let time_travel_delta = (*new_version_delta).clone();
200 new_version_delta.pre_apply();
201 time_travel_delta
202 }
203}
204
205impl InMemValTransaction for HummockVersionTransaction<'_> {
206 fn commit(self) {
207 if let Some((version, deltas)) = self.pre_applied_version {
208 *self.orig_version = version;
209 if !self.disable_apply_to_txn {
210 let pb_deltas = deltas.iter().map(|delta| delta.to_protobuf()).collect();
211 self.notification_manager.notify_hummock_without_version(
212 Operation::Add,
213 Info::HummockVersionDeltas(risingwave_pb::hummock::HummockVersionDeltas {
214 version_deltas: pb_deltas,
215 }),
216 );
217 self.notification_manager.notify_frontend_without_version(
218 Operation::Update,
219 Info::HummockVersionDeltas(HummockVersionDeltas {
220 version_deltas: deltas
221 .iter()
222 .map(|delta| {
223 FrontendHummockVersionDelta::from_delta(delta).to_protobuf()
224 })
225 .collect(),
226 }),
227 );
228 if let Some(table_committed_epoch_notifiers) = self.table_committed_epoch_notifiers
229 {
230 table_committed_epoch_notifiers
231 .lock()
232 .notify_deltas(&deltas);
233 }
234 }
235 for delta in deltas {
236 assert!(self.orig_deltas.insert(delta.id, delta.clone()).is_none());
237 }
238
239 trigger_delta_log_stats(self.meta_metrics, self.orig_deltas.len());
240 trigger_version_stat(self.meta_metrics, self.orig_version);
241 }
242 }
243}
244
245impl<TXN> ValTransaction<TXN> for HummockVersionTransaction<'_>
246where
247 HummockVersionDelta: Transactional<TXN>,
248 HummockVersionStats: Transactional<TXN>,
249{
250 async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
251 if self.disable_apply_to_txn {
252 return Ok(());
253 }
254 for delta in self
255 .pre_applied_version
256 .iter()
257 .flat_map(|(_, deltas)| deltas.iter())
258 {
259 delta.upsert_in_transaction(txn).await?;
260 }
261 Ok(())
262 }
263}
264
265pub(super) struct SingleDeltaTransaction<'a, 'b> {
266 version_txn: &'b mut HummockVersionTransaction<'a>,
267 delta: Option<HummockVersionDelta>,
268}
269
270impl SingleDeltaTransaction<'_, '_> {
271 pub(super) fn latest_version(&self) -> &HummockVersion {
272 self.version_txn.latest_version()
273 }
274
275 pub(super) fn pre_apply(mut self) {
276 self.version_txn.pre_apply(self.delta.take().unwrap());
277 }
278
279 pub(super) fn with_latest_version(
280 &mut self,
281 f: impl FnOnce(&HummockVersion, &mut HummockVersionDelta),
282 ) {
283 f(
284 self.version_txn.latest_version(),
285 self.delta.as_mut().expect("should exist"),
286 )
287 }
288}
289
290impl Deref for SingleDeltaTransaction<'_, '_> {
291 type Target = HummockVersionDelta;
292
293 fn deref(&self) -> &Self::Target {
294 self.delta.as_ref().expect("should exist")
295 }
296}
297
298impl DerefMut for SingleDeltaTransaction<'_, '_> {
299 fn deref_mut(&mut self) -> &mut Self::Target {
300 self.delta.as_mut().expect("should exist")
301 }
302}
303
304impl Drop for SingleDeltaTransaction<'_, '_> {
305 fn drop(&mut self) {
306 if let Some(delta) = self.delta.take() {
307 self.version_txn.pre_apply(delta);
308 }
309 }
310}
311
312pub(super) struct HummockVersionStatsTransaction<'a> {
313 stats: VarTransaction<'a, HummockVersionStats>,
314 notification_manager: &'a NotificationManager,
315}
316
317impl<'a> HummockVersionStatsTransaction<'a> {
318 pub(super) fn new(
319 stats: &'a mut HummockVersionStats,
320 notification_manager: &'a NotificationManager,
321 ) -> Self {
322 Self {
323 stats: VarTransaction::new(stats),
324 notification_manager,
325 }
326 }
327}
328
329impl InMemValTransaction for HummockVersionStatsTransaction<'_> {
330 fn commit(self) {
331 if self.stats.has_new_value() {
332 let stats = self.stats.clone();
333 self.stats.commit();
334 self.notification_manager
335 .notify_frontend_without_version(Operation::Update, Info::HummockStats(stats));
336 }
337 }
338}
339
340impl<TXN> ValTransaction<TXN> for HummockVersionStatsTransaction<'_>
341where
342 HummockVersionStats: Transactional<TXN>,
343{
344 async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
345 self.stats.apply_to_txn(txn).await
346 }
347}
348
349impl Deref for HummockVersionStatsTransaction<'_> {
350 type Target = HummockVersionStats;
351
352 fn deref(&self) -> &Self::Target {
353 self.stats.deref()
354 }
355}
356
357impl DerefMut for HummockVersionStatsTransaction<'_> {
358 fn deref_mut(&mut self) -> &mut Self::Target {
359 self.stats.deref_mut()
360 }
361}