risingwave_meta/hummock/manager/
transaction.rs1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::ops::{Deref, DerefMut};
17
18use parking_lot::Mutex;
19use risingwave_common::catalog::TableId;
20use risingwave_hummock_sdk::change_log::ChangeLogDelta;
21use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
22use risingwave_hummock_sdk::sstable_info::SstableInfo;
23use risingwave_hummock_sdk::table_watermark::TableWatermarks;
24use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
25use risingwave_hummock_sdk::version::{GroupDelta, HummockVersion, HummockVersionDelta};
26use risingwave_hummock_sdk::{CompactionGroupId, FrontendHummockVersionDelta, HummockVersionId};
27use risingwave_pb::hummock::{
28 CompatibilityVersion, GroupConstruct, HummockVersionDeltas, HummockVersionStats,
29 StateTableInfoDelta,
30};
31use risingwave_pb::meta::subscribe_response::{Info, Operation};
32
33use super::TableCommittedEpochNotifiers;
34use crate::hummock::model::CompactionGroup;
35use crate::manager::NotificationManager;
36use crate::model::{
37 InMemValTransaction, MetadataModelResult, Transactional, ValTransaction, VarTransaction,
38};
39use crate::rpc::metrics::MetaMetrics;
40
41fn trigger_delta_log_stats(metrics: &MetaMetrics, total_number: usize) {
42 metrics.delta_log_count.set(total_number as _);
43}
44
45fn trigger_version_stat(metrics: &MetaMetrics, current_version: &HummockVersion) {
46 metrics
47 .version_size
48 .set(current_version.estimated_encode_len() as i64);
49 metrics
50 .current_version_id
51 .set(current_version.id.to_u64() as i64);
52}
53
54pub(super) struct HummockVersionTransaction<'a> {
55 orig_version: &'a mut HummockVersion,
56 orig_deltas: &'a mut BTreeMap<HummockVersionId, HummockVersionDelta>,
57 notification_manager: &'a NotificationManager,
58 table_committed_epoch_notifiers: Option<&'a Mutex<TableCommittedEpochNotifiers>>,
59 meta_metrics: &'a MetaMetrics,
60
61 pre_applied_version: Option<(HummockVersion, Vec<HummockVersionDelta>)>,
62 disable_apply_to_txn: bool,
63}
64
65impl<'a> HummockVersionTransaction<'a> {
66 pub(super) fn new(
67 version: &'a mut HummockVersion,
68 deltas: &'a mut BTreeMap<HummockVersionId, HummockVersionDelta>,
69 notification_manager: &'a NotificationManager,
70 table_committed_epoch_notifiers: Option<&'a Mutex<TableCommittedEpochNotifiers>>,
71 meta_metrics: &'a MetaMetrics,
72 ) -> Self {
73 Self {
74 orig_version: version,
75 orig_deltas: deltas,
76 pre_applied_version: None,
77 disable_apply_to_txn: false,
78 notification_manager,
79 table_committed_epoch_notifiers,
80 meta_metrics,
81 }
82 }
83
84 pub(super) fn disable_apply_to_txn(&mut self) {
85 assert!(
86 self.pre_applied_version.is_none(),
87 "should only call disable at the beginning of txn"
88 );
89 self.disable_apply_to_txn = true;
90 }
91
92 pub(super) fn latest_version(&self) -> &HummockVersion {
93 if let Some((version, _)) = &self.pre_applied_version {
94 version
95 } else {
96 self.orig_version
97 }
98 }
99
100 pub(super) fn new_delta<'b>(&'b mut self) -> SingleDeltaTransaction<'a, 'b> {
101 let delta = self.latest_version().version_delta_after();
102 SingleDeltaTransaction {
103 version_txn: self,
104 delta: Some(delta),
105 }
106 }
107
108 fn pre_apply(&mut self, delta: HummockVersionDelta) {
109 let (version, deltas) = self
110 .pre_applied_version
111 .get_or_insert_with(|| (self.orig_version.clone(), Vec::with_capacity(1)));
112 version.apply_version_delta(&delta);
113 deltas.push(delta);
114 }
115
116 pub(super) fn pre_commit_epoch(
118 &mut self,
119 tables_to_commit: &HashMap<TableId, u64>,
120 new_compaction_groups: Vec<CompactionGroup>,
121 group_id_to_sub_levels: BTreeMap<CompactionGroupId, Vec<Vec<SstableInfo>>>,
122 new_table_ids: &HashMap<TableId, CompactionGroupId>,
123 new_table_watermarks: HashMap<TableId, TableWatermarks>,
124 change_log_delta: HashMap<TableId, ChangeLogDelta>,
125 vector_index_delta: HashMap<TableId, VectorIndexDelta>,
126 group_id_to_truncate_tables: HashMap<CompactionGroupId, HashSet<TableId>>,
127 ) -> HummockVersionDelta {
128 let mut new_version_delta = self.new_delta();
129 new_version_delta.new_table_watermarks = new_table_watermarks;
130 new_version_delta.change_log_delta = change_log_delta;
131 new_version_delta.vector_index_delta = vector_index_delta;
132
133 for compaction_group in &new_compaction_groups {
134 let group_deltas = &mut new_version_delta
135 .group_deltas
136 .entry(compaction_group.group_id())
137 .or_default()
138 .group_deltas;
139
140 #[expect(deprecated)]
141 group_deltas.push(GroupDelta::GroupConstruct(Box::new(GroupConstruct {
142 group_config: Some(compaction_group.compaction_config().as_ref().clone()),
143 group_id: compaction_group.group_id(),
144 parent_group_id: StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId,
145 new_sst_start_id: 0, table_ids: vec![],
147 version: CompatibilityVersion::LATEST as _,
148 split_key: None,
149 })));
150 }
151
152 for (compaction_group_id, sub_levels) in group_id_to_sub_levels {
154 let group_deltas = &mut new_version_delta
155 .group_deltas
156 .entry(compaction_group_id)
157 .or_default()
158 .group_deltas;
159
160 for sub_level in sub_levels {
161 group_deltas.push(GroupDelta::NewL0SubLevel(sub_level));
162 }
163 }
164
165 for (compaction_group_id, table_ids) in group_id_to_truncate_tables {
166 let group_deltas = &mut new_version_delta
167 .group_deltas
168 .entry(compaction_group_id)
169 .or_default()
170 .group_deltas;
171
172 group_deltas.push(GroupDelta::TruncateTables(table_ids.into_iter().collect()));
173 }
174
175 new_version_delta.with_latest_version(|version, delta| {
177 for (table_id, cg_id) in new_table_ids {
178 assert!(
179 !version.state_table_info.info().contains_key(table_id),
180 "newly added table exists previously: {:?}",
181 table_id
182 );
183 let committed_epoch = *tables_to_commit.get(table_id).expect("newly added table must exist in tables_to_commit");
184 delta.state_table_info_delta.insert(
185 *table_id,
186 StateTableInfoDelta {
187 committed_epoch,
188 compaction_group_id: *cg_id,
189 },
190 );
191 }
192
193 for (table_id, committed_epoch) in tables_to_commit {
194 if new_table_ids.contains_key(table_id) {
195 continue;
196 }
197 let info = version.state_table_info.info().get(table_id).unwrap_or_else(|| {
198 panic!("tables_to_commit {:?} contains table_id {} that is not newly added but not exists previously", tables_to_commit, table_id);
199 });
200 assert!(delta
201 .state_table_info_delta
202 .insert(
203 *table_id,
204 StateTableInfoDelta {
205 committed_epoch: *committed_epoch,
206 compaction_group_id: info.compaction_group_id,
207 }
208 )
209 .is_none());
210 }
211 });
212
213 let time_travel_delta = (*new_version_delta).clone();
214 new_version_delta.pre_apply();
215 time_travel_delta
216 }
217}
218
219impl InMemValTransaction for HummockVersionTransaction<'_> {
220 fn commit(self) {
221 if let Some((version, deltas)) = self.pre_applied_version {
222 *self.orig_version = version;
223 if !self.disable_apply_to_txn {
224 let pb_deltas = deltas.iter().map(|delta| delta.to_protobuf()).collect();
225 self.notification_manager.notify_hummock_without_version(
226 Operation::Add,
227 Info::HummockVersionDeltas(risingwave_pb::hummock::HummockVersionDeltas {
228 version_deltas: pb_deltas,
229 }),
230 );
231 self.notification_manager.notify_frontend_without_version(
232 Operation::Update,
233 Info::HummockVersionDeltas(HummockVersionDeltas {
234 version_deltas: deltas
235 .iter()
236 .map(|delta| {
237 FrontendHummockVersionDelta::from_delta(delta).to_protobuf()
238 })
239 .collect(),
240 }),
241 );
242 if let Some(table_committed_epoch_notifiers) = self.table_committed_epoch_notifiers
243 {
244 table_committed_epoch_notifiers
245 .lock()
246 .notify_deltas(&deltas);
247 }
248 }
249 for delta in deltas {
250 assert!(self.orig_deltas.insert(delta.id, delta.clone()).is_none());
251 }
252
253 trigger_delta_log_stats(self.meta_metrics, self.orig_deltas.len());
254 trigger_version_stat(self.meta_metrics, self.orig_version);
255 }
256 }
257}
258
259impl<TXN> ValTransaction<TXN> for HummockVersionTransaction<'_>
260where
261 HummockVersionDelta: Transactional<TXN>,
262 HummockVersionStats: Transactional<TXN>,
263{
264 async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
265 if self.disable_apply_to_txn {
266 return Ok(());
267 }
268 for delta in self
269 .pre_applied_version
270 .iter()
271 .flat_map(|(_, deltas)| deltas.iter())
272 {
273 delta.upsert_in_transaction(txn).await?;
274 }
275 Ok(())
276 }
277}
278
279pub(super) struct SingleDeltaTransaction<'a, 'b> {
280 version_txn: &'b mut HummockVersionTransaction<'a>,
281 delta: Option<HummockVersionDelta>,
282}
283
284impl SingleDeltaTransaction<'_, '_> {
285 pub(super) fn latest_version(&self) -> &HummockVersion {
286 self.version_txn.latest_version()
287 }
288
289 pub(super) fn pre_apply(mut self) {
290 self.version_txn.pre_apply(self.delta.take().unwrap());
291 }
292
293 pub(super) fn with_latest_version(
294 &mut self,
295 f: impl FnOnce(&HummockVersion, &mut HummockVersionDelta),
296 ) {
297 f(
298 self.version_txn.latest_version(),
299 self.delta.as_mut().expect("should exist"),
300 )
301 }
302}
303
304impl Deref for SingleDeltaTransaction<'_, '_> {
305 type Target = HummockVersionDelta;
306
307 fn deref(&self) -> &Self::Target {
308 self.delta.as_ref().expect("should exist")
309 }
310}
311
312impl DerefMut for SingleDeltaTransaction<'_, '_> {
313 fn deref_mut(&mut self) -> &mut Self::Target {
314 self.delta.as_mut().expect("should exist")
315 }
316}
317
318impl Drop for SingleDeltaTransaction<'_, '_> {
319 fn drop(&mut self) {
320 if let Some(delta) = self.delta.take() {
321 self.version_txn.pre_apply(delta);
322 }
323 }
324}
325
326pub(super) struct HummockVersionStatsTransaction<'a> {
327 stats: VarTransaction<'a, HummockVersionStats>,
328 notification_manager: &'a NotificationManager,
329}
330
331impl<'a> HummockVersionStatsTransaction<'a> {
332 pub(super) fn new(
333 stats: &'a mut HummockVersionStats,
334 notification_manager: &'a NotificationManager,
335 ) -> Self {
336 Self {
337 stats: VarTransaction::new(stats),
338 notification_manager,
339 }
340 }
341}
342
343impl InMemValTransaction for HummockVersionStatsTransaction<'_> {
344 fn commit(self) {
345 if self.stats.has_new_value() {
346 let stats = self.stats.clone();
347 self.stats.commit();
348 self.notification_manager
349 .notify_frontend_without_version(Operation::Update, Info::HummockStats(stats));
350 }
351 }
352}
353
354impl<TXN> ValTransaction<TXN> for HummockVersionStatsTransaction<'_>
355where
356 HummockVersionStats: Transactional<TXN>,
357{
358 async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
359 self.stats.apply_to_txn(txn).await
360 }
361}
362
363impl Deref for HummockVersionStatsTransaction<'_> {
364 type Target = HummockVersionStats;
365
366 fn deref(&self) -> &Self::Target {
367 self.stats.deref()
368 }
369}
370
371impl DerefMut for HummockVersionStatsTransaction<'_> {
372 fn deref_mut(&mut self) -> &mut Self::Target {
373 self.stats.deref_mut()
374 }
375}