risingwave_meta/hummock/manager/
transaction.rs1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::ops::{Deref, DerefMut};
17
18use parking_lot::Mutex;
19use risingwave_common::catalog::TableId;
20use risingwave_hummock_sdk::change_log::ChangeLogDelta;
21use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
22use risingwave_hummock_sdk::sstable_info::SstableInfo;
23use risingwave_hummock_sdk::table_watermark::TableWatermarks;
24use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
25use risingwave_hummock_sdk::version::{GroupDelta, HummockVersion, HummockVersionDelta};
26use risingwave_hummock_sdk::{
27 CompactionGroupId, FrontendHummockVersionDelta, HummockSstableId, HummockVersionId,
28};
29use risingwave_pb::hummock::{
30 CompatibilityVersion, GroupConstruct, HummockVersionDeltas, HummockVersionStats,
31 StateTableInfoDelta,
32};
33use risingwave_pb::meta::subscribe_response::{Info, Operation};
34
35use super::TableCommittedEpochNotifiers;
36use crate::hummock::model::CompactionGroup;
37use crate::manager::NotificationManager;
38use crate::model::{
39 InMemValTransaction, MetadataModelResult, Transactional, ValTransaction, VarTransaction,
40};
41use crate::rpc::metrics::MetaMetrics;
42
43fn trigger_delta_log_stats(metrics: &MetaMetrics, total_number: usize) {
44 metrics.delta_log_count.set(total_number as _);
45}
46
47fn trigger_version_stat(metrics: &MetaMetrics, current_version: &HummockVersion) {
48 metrics
49 .version_size
50 .set(current_version.estimated_encode_len() as i64);
51 metrics
52 .current_version_id
53 .set(current_version.id.as_i64_id());
54}
55
56pub(super) struct HummockVersionTransaction<'a> {
57 orig_version: &'a mut HummockVersion,
58 orig_deltas: &'a mut BTreeMap<HummockVersionId, HummockVersionDelta>,
59 notification_manager: &'a NotificationManager,
60 table_committed_epoch_notifiers: Option<&'a Mutex<TableCommittedEpochNotifiers>>,
61 meta_metrics: &'a MetaMetrics,
62
63 pre_applied_version: Option<(HummockVersion, Vec<HummockVersionDelta>)>,
64 disable_apply_to_txn: bool,
65}
66
67impl<'a> HummockVersionTransaction<'a> {
68 pub(super) fn new(
69 version: &'a mut HummockVersion,
70 deltas: &'a mut BTreeMap<HummockVersionId, HummockVersionDelta>,
71 notification_manager: &'a NotificationManager,
72 table_committed_epoch_notifiers: Option<&'a Mutex<TableCommittedEpochNotifiers>>,
73 meta_metrics: &'a MetaMetrics,
74 ) -> Self {
75 Self {
76 orig_version: version,
77 orig_deltas: deltas,
78 pre_applied_version: None,
79 disable_apply_to_txn: false,
80 notification_manager,
81 table_committed_epoch_notifiers,
82 meta_metrics,
83 }
84 }
85
86 pub(super) fn disable_apply_to_txn(&mut self) {
87 assert!(
88 self.pre_applied_version.is_none(),
89 "should only call disable at the beginning of txn"
90 );
91 self.disable_apply_to_txn = true;
92 }
93
94 pub(super) fn latest_version(&self) -> &HummockVersion {
95 if let Some((version, _)) = &self.pre_applied_version {
96 version
97 } else {
98 self.orig_version
99 }
100 }
101
102 pub(super) fn new_delta<'b>(&'b mut self) -> SingleDeltaTransaction<'a, 'b> {
103 let delta = self.latest_version().version_delta_after();
104 SingleDeltaTransaction {
105 version_txn: self,
106 delta: Some(delta),
107 }
108 }
109
110 fn pre_apply(&mut self, delta: HummockVersionDelta) {
111 let (version, deltas) = self
112 .pre_applied_version
113 .get_or_insert_with(|| (self.orig_version.clone(), Vec::with_capacity(1)));
114 version.apply_version_delta(&delta);
115 deltas.push(delta);
116 }
117
118 pub(super) fn pre_commit_epoch(
120 &mut self,
121 tables_to_commit: &HashMap<TableId, u64>,
122 new_compaction_groups: Vec<CompactionGroup>,
123 group_id_to_sub_levels: BTreeMap<CompactionGroupId, Vec<Vec<SstableInfo>>>,
124 new_table_ids: &HashMap<TableId, CompactionGroupId>,
125 new_table_watermarks: HashMap<TableId, TableWatermarks>,
126 change_log_delta: HashMap<TableId, ChangeLogDelta>,
127 vector_index_delta: HashMap<TableId, VectorIndexDelta>,
128 group_id_to_truncate_tables: HashMap<CompactionGroupId, HashSet<TableId>>,
129 ) -> HummockVersionDelta {
130 let mut new_version_delta = self.new_delta();
131 new_version_delta.new_table_watermarks = new_table_watermarks;
132 new_version_delta.change_log_delta = change_log_delta;
133 new_version_delta.vector_index_delta = vector_index_delta;
134
135 for compaction_group in &new_compaction_groups {
136 let group_deltas = &mut new_version_delta
137 .group_deltas
138 .entry(compaction_group.group_id())
139 .or_default()
140 .group_deltas;
141
142 #[expect(deprecated)]
143 group_deltas.push(GroupDelta::GroupConstruct(Box::new(GroupConstruct {
144 group_config: Some(compaction_group.compaction_config().as_ref().clone()),
145 group_id: compaction_group.group_id(),
146 parent_group_id: StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId,
147 new_sst_start_id: HummockSstableId::default(), table_ids: vec![],
149 version: CompatibilityVersion::LATEST as _,
150 split_key: None,
151 })));
152 }
153
154 for (compaction_group_id, sub_levels) in group_id_to_sub_levels {
156 let group_deltas = &mut new_version_delta
157 .group_deltas
158 .entry(compaction_group_id)
159 .or_default()
160 .group_deltas;
161
162 for sub_level in sub_levels {
163 group_deltas.push(GroupDelta::NewL0SubLevel(sub_level));
164 }
165 }
166
167 for (compaction_group_id, table_ids) in group_id_to_truncate_tables {
168 let group_deltas = &mut new_version_delta
169 .group_deltas
170 .entry(compaction_group_id)
171 .or_default()
172 .group_deltas;
173
174 group_deltas.push(GroupDelta::TruncateTables(table_ids.into_iter().collect()));
175 }
176
177 new_version_delta.with_latest_version(|version, delta| {
179 for (table_id, cg_id) in new_table_ids {
180 assert!(
181 !version.state_table_info.info().contains_key(table_id),
182 "newly added table exists previously: {:?}",
183 table_id
184 );
185 let committed_epoch = *tables_to_commit.get(table_id).expect("newly added table must exist in tables_to_commit");
186 delta.state_table_info_delta.insert(
187 *table_id,
188 StateTableInfoDelta {
189 committed_epoch,
190 compaction_group_id: *cg_id,
191 },
192 );
193 }
194
195 for (table_id, committed_epoch) in tables_to_commit {
196 if new_table_ids.contains_key(table_id) {
197 continue;
198 }
199 let info = version.state_table_info.info().get(table_id).unwrap_or_else(|| {
200 panic!("tables_to_commit {:?} contains table_id {} that is not newly added but not exists previously", tables_to_commit, table_id);
201 });
202 assert!(delta
203 .state_table_info_delta
204 .insert(
205 *table_id,
206 StateTableInfoDelta {
207 committed_epoch: *committed_epoch,
208 compaction_group_id: info.compaction_group_id,
209 }
210 )
211 .is_none());
212 }
213 });
214
215 let time_travel_delta = (*new_version_delta).clone();
216 new_version_delta.pre_apply();
217 time_travel_delta
218 }
219}
220
221impl InMemValTransaction for HummockVersionTransaction<'_> {
222 fn commit(self) {
223 if let Some((version, deltas)) = self.pre_applied_version {
224 *self.orig_version = version;
225 if !self.disable_apply_to_txn {
226 let pb_deltas = deltas.iter().map(|delta| delta.to_protobuf()).collect();
227 self.notification_manager.notify_hummock_without_version(
228 Operation::Add,
229 Info::HummockVersionDeltas(risingwave_pb::hummock::HummockVersionDeltas {
230 version_deltas: pb_deltas,
231 }),
232 );
233 self.notification_manager.notify_frontend_without_version(
234 Operation::Update,
235 Info::HummockVersionDeltas(HummockVersionDeltas {
236 version_deltas: deltas
237 .iter()
238 .map(|delta| {
239 FrontendHummockVersionDelta::from_delta(delta).to_protobuf()
240 })
241 .collect(),
242 }),
243 );
244 if let Some(table_committed_epoch_notifiers) = self.table_committed_epoch_notifiers
245 {
246 table_committed_epoch_notifiers
247 .lock()
248 .notify_deltas(&deltas);
249 }
250 }
251 for delta in deltas {
252 assert!(self.orig_deltas.insert(delta.id, delta.clone()).is_none());
253 }
254
255 trigger_delta_log_stats(self.meta_metrics, self.orig_deltas.len());
256 trigger_version_stat(self.meta_metrics, self.orig_version);
257 }
258 }
259}
260
261impl<TXN> ValTransaction<TXN> for HummockVersionTransaction<'_>
262where
263 HummockVersionDelta: Transactional<TXN>,
264 HummockVersionStats: Transactional<TXN>,
265{
266 async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
267 if self.disable_apply_to_txn {
268 return Ok(());
269 }
270 for delta in self
271 .pre_applied_version
272 .iter()
273 .flat_map(|(_, deltas)| deltas.iter())
274 {
275 delta.upsert_in_transaction(txn).await?;
276 }
277 Ok(())
278 }
279}
280
281pub(super) struct SingleDeltaTransaction<'a, 'b> {
282 version_txn: &'b mut HummockVersionTransaction<'a>,
283 delta: Option<HummockVersionDelta>,
284}
285
286impl SingleDeltaTransaction<'_, '_> {
287 pub(super) fn latest_version(&self) -> &HummockVersion {
288 self.version_txn.latest_version()
289 }
290
291 pub(super) fn pre_apply(mut self) {
292 self.version_txn.pre_apply(self.delta.take().unwrap());
293 }
294
295 pub(super) fn with_latest_version(
296 &mut self,
297 f: impl FnOnce(&HummockVersion, &mut HummockVersionDelta),
298 ) {
299 f(
300 self.version_txn.latest_version(),
301 self.delta.as_mut().expect("should exist"),
302 )
303 }
304}
305
306impl Deref for SingleDeltaTransaction<'_, '_> {
307 type Target = HummockVersionDelta;
308
309 fn deref(&self) -> &Self::Target {
310 self.delta.as_ref().expect("should exist")
311 }
312}
313
314impl DerefMut for SingleDeltaTransaction<'_, '_> {
315 fn deref_mut(&mut self) -> &mut Self::Target {
316 self.delta.as_mut().expect("should exist")
317 }
318}
319
320impl Drop for SingleDeltaTransaction<'_, '_> {
321 fn drop(&mut self) {
322 if let Some(delta) = self.delta.take() {
323 self.version_txn.pre_apply(delta);
324 }
325 }
326}
327
328pub(super) struct HummockVersionStatsTransaction<'a> {
329 stats: VarTransaction<'a, HummockVersionStats>,
330 notification_manager: &'a NotificationManager,
331}
332
333impl<'a> HummockVersionStatsTransaction<'a> {
334 pub(super) fn new(
335 stats: &'a mut HummockVersionStats,
336 notification_manager: &'a NotificationManager,
337 ) -> Self {
338 Self {
339 stats: VarTransaction::new(stats),
340 notification_manager,
341 }
342 }
343}
344
345impl InMemValTransaction for HummockVersionStatsTransaction<'_> {
346 fn commit(self) {
347 if self.stats.has_new_value() {
348 let stats = self.stats.clone();
349 self.stats.commit();
350 self.notification_manager
351 .notify_frontend_without_version(Operation::Update, Info::HummockStats(stats));
352 }
353 }
354}
355
356impl<TXN> ValTransaction<TXN> for HummockVersionStatsTransaction<'_>
357where
358 HummockVersionStats: Transactional<TXN>,
359{
360 async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
361 self.stats.apply_to_txn(txn).await
362 }
363}
364
365impl Deref for HummockVersionStatsTransaction<'_> {
366 type Target = HummockVersionStats;
367
368 fn deref(&self) -> &Self::Target {
369 self.stats.deref()
370 }
371}
372
373impl DerefMut for HummockVersionStatsTransaction<'_> {
374 fn deref_mut(&mut self) -> &mut Self::Target {
375 self.stats.deref_mut()
376 }
377}