1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::ops::{Deref, DerefMut};
17
18use parking_lot::Mutex;
19use risingwave_common::catalog::TableId;
20use risingwave_hummock_sdk::change_log::{ChangeLogDelta, TableChangeLog};
21use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
22use risingwave_hummock_sdk::sstable_info::SstableInfo;
23use risingwave_hummock_sdk::table_watermark::TableWatermarks;
24use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
25use risingwave_hummock_sdk::version::{GroupDelta, HummockVersion, HummockVersionDelta};
26use risingwave_hummock_sdk::{
27 CompactionGroupId, FrontendHummockVersionDelta, HummockSstableId, HummockVersionId,
28};
29use risingwave_meta_model::Epoch;
30use risingwave_pb::hummock::{
31 CompatibilityVersion, GroupConstruct, HummockVersionDeltas, HummockVersionStats,
32 StateTableInfoDelta,
33};
34use risingwave_pb::meta::subscribe_response::{Info, Operation};
35use sea_orm::{ConnectionTrait, EntityTrait};
36
37use super::TableCommittedEpochNotifiers;
38use crate::hummock::model::CompactionGroup;
39use crate::hummock::model::ext::to_table_change_log_meta_store_model;
40use crate::manager::{MetaOpts, NotificationManager};
41use crate::model::{
42 InMemValTransaction, MetadataModelResult, Transactional, ValTransaction, VarTransaction,
43};
44use crate::rpc::metrics::MetaMetrics;
45
46fn trigger_delta_log_stats(metrics: &MetaMetrics, total_number: usize) {
47 metrics.delta_log_count.set(total_number as _);
48}
49
50fn trigger_version_stat(metrics: &MetaMetrics, current_version: &HummockVersion) {
51 metrics
52 .version_size
53 .set(current_version.estimated_encode_len() as i64);
54 metrics
55 .current_version_id
56 .set(current_version.id.as_i64_id());
57}
58
59pub(super) struct HummockVersionTransaction<'a> {
60 orig_version: &'a mut HummockVersion,
61 orig_deltas: &'a mut BTreeMap<HummockVersionId, HummockVersionDelta>,
62 orig_table_change_log: &'a mut HashMap<TableId, TableChangeLog>,
63 notification_manager: &'a NotificationManager,
64 table_committed_epoch_notifiers: Option<&'a Mutex<TableCommittedEpochNotifiers>>,
65 meta_metrics: &'a MetaMetrics,
66
67 pre_applied_version: Option<(HummockVersion, Vec<HummockVersionDelta>, HashSet<TableId>)>,
68 disable_apply_to_txn: bool,
69 opts: &'a MetaOpts,
70}
71
72impl<'a> HummockVersionTransaction<'a> {
73 pub(super) fn new(
74 version: &'a mut HummockVersion,
75 deltas: &'a mut BTreeMap<HummockVersionId, HummockVersionDelta>,
76 table_change_log: &'a mut HashMap<TableId, TableChangeLog>,
77 notification_manager: &'a NotificationManager,
78 table_committed_epoch_notifiers: Option<&'a Mutex<TableCommittedEpochNotifiers>>,
79 meta_metrics: &'a MetaMetrics,
80 opts: &'a MetaOpts,
81 ) -> Self {
82 Self {
83 orig_version: version,
84 orig_deltas: deltas,
85 orig_table_change_log: table_change_log,
86 pre_applied_version: None,
87 disable_apply_to_txn: false,
88 notification_manager,
89 table_committed_epoch_notifiers,
90 meta_metrics,
91 opts,
92 }
93 }
94
95 pub(super) fn disable_apply_to_txn(&mut self) {
96 assert!(
97 self.pre_applied_version.is_none(),
98 "should only call disable at the beginning of txn"
99 );
100 self.disable_apply_to_txn = true;
101 }
102
103 pub(super) fn latest_version(&self) -> &HummockVersion {
104 if let Some((version, _, _)) = &self.pre_applied_version {
105 version
106 } else {
107 self.orig_version
108 }
109 }
110
111 pub(super) fn new_delta<'b>(&'b mut self) -> SingleDeltaTransaction<'a, 'b> {
112 let delta = self.latest_version().version_delta_after();
113 SingleDeltaTransaction {
114 version_txn: self,
115 delta: Some(delta),
116 }
117 }
118
119 fn pre_apply(&mut self, delta: HummockVersionDelta) {
120 let (version, deltas, gc_change_log_deltas) =
121 self.pre_applied_version.get_or_insert_with(|| {
122 (
123 self.orig_version.clone(),
124 Vec::with_capacity(1),
125 HashSet::new(),
126 )
127 });
128 let changed_table_info = version.apply_version_delta(&delta);
129 let gc_change_log_delta = HummockVersion::collect_gc_change_log_delta(
134 self.orig_table_change_log.keys(),
135 &delta.change_log_delta,
136 &delta.removed_table_ids,
137 &delta.state_table_info_delta,
138 &changed_table_info,
139 );
140 gc_change_log_deltas.extend(gc_change_log_delta);
141 deltas.push(delta);
142 }
143
144 pub(super) fn pre_commit_epoch(
146 &mut self,
147 tables_to_commit: &HashMap<TableId, u64>,
148 new_compaction_groups: Vec<CompactionGroup>,
149 group_id_to_sub_levels: BTreeMap<CompactionGroupId, Vec<Vec<SstableInfo>>>,
150 new_table_ids: &HashMap<TableId, CompactionGroupId>,
151 new_table_watermarks: HashMap<TableId, TableWatermarks>,
152 change_log_delta: HashMap<TableId, ChangeLogDelta>,
153 vector_index_delta: HashMap<TableId, VectorIndexDelta>,
154 group_id_to_truncate_tables: HashMap<CompactionGroupId, HashSet<TableId>>,
155 ) -> HummockVersionDelta {
156 let mut new_version_delta = self.new_delta();
157 new_version_delta.new_table_watermarks = new_table_watermarks;
158 new_version_delta.change_log_delta = change_log_delta;
159 new_version_delta.vector_index_delta = vector_index_delta;
160
161 for compaction_group in &new_compaction_groups {
162 let group_deltas = &mut new_version_delta
163 .group_deltas
164 .entry(compaction_group.group_id())
165 .or_default()
166 .group_deltas;
167
168 #[expect(deprecated)]
169 group_deltas.push(GroupDelta::GroupConstruct(Box::new(GroupConstruct {
170 group_config: Some(compaction_group.compaction_config().as_ref().clone()),
171 group_id: compaction_group.group_id(),
172 parent_group_id: StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId,
173 new_sst_start_id: HummockSstableId::default(), table_ids: vec![],
175 version: CompatibilityVersion::LATEST as _,
176 split_key: None,
177 })));
178 }
179
180 for (compaction_group_id, sub_levels) in group_id_to_sub_levels {
182 let group_deltas = &mut new_version_delta
183 .group_deltas
184 .entry(compaction_group_id)
185 .or_default()
186 .group_deltas;
187
188 for sub_level in sub_levels {
189 group_deltas.push(GroupDelta::NewL0SubLevel(sub_level));
190 }
191 }
192
193 for (compaction_group_id, table_ids) in group_id_to_truncate_tables {
194 let group_deltas = &mut new_version_delta
195 .group_deltas
196 .entry(compaction_group_id)
197 .or_default()
198 .group_deltas;
199
200 group_deltas.push(GroupDelta::PruneTableIdsFromSsts(
201 table_ids.into_iter().collect(),
202 ));
203 }
204
205 new_version_delta.with_latest_version(|version, delta| {
207 for (table_id, cg_id) in new_table_ids {
208 assert!(
209 !version.state_table_info.info().contains_key(table_id),
210 "newly added table exists previously: {:?}",
211 table_id
212 );
213 let committed_epoch = *tables_to_commit.get(table_id).expect("newly added table must exist in tables_to_commit");
214 delta.state_table_info_delta.insert(
215 *table_id,
216 StateTableInfoDelta {
217 committed_epoch,
218 compaction_group_id: *cg_id,
219 },
220 );
221 }
222
223 for (table_id, committed_epoch) in tables_to_commit {
224 if new_table_ids.contains_key(table_id) {
225 continue;
226 }
227 let info = version.state_table_info.info().get(table_id).unwrap_or_else(|| {
228 panic!("tables_to_commit {:?} contains table_id {} that is not newly added but not exists previously", tables_to_commit, table_id);
229 });
230 assert!(delta
231 .state_table_info_delta
232 .insert(
233 *table_id,
234 StateTableInfoDelta {
235 committed_epoch: *committed_epoch,
236 compaction_group_id: info.compaction_group_id,
237 }
238 )
239 .is_none());
240 }
241 });
242
243 let time_travel_delta = (*new_version_delta).clone();
244 new_version_delta.pre_apply();
245 time_travel_delta
246 }
247}
248
249impl InMemValTransaction for HummockVersionTransaction<'_> {
250 fn commit(self) {
251 if let Some((version, deltas, gc_change_log_deltas)) = self.pre_applied_version {
252 *self.orig_version = version;
253 for delta in &deltas {
254 HummockVersion::apply_change_log_delta(
255 self.orig_table_change_log,
256 &delta.change_log_delta,
257 );
258 }
259 self.orig_table_change_log
260 .retain(|table_id, _| !gc_change_log_deltas.contains(table_id));
261
262 if !self.disable_apply_to_txn {
263 let pb_deltas = deltas.iter().map(|delta| delta.to_protobuf()).collect();
264 self.notification_manager.notify_hummock_without_version(
265 Operation::Add,
266 Info::HummockVersionDeltas(risingwave_pb::hummock::HummockVersionDeltas {
267 version_deltas: pb_deltas,
268 }),
269 );
270 self.notification_manager.notify_frontend_without_version(
271 Operation::Update,
272 Info::HummockVersionDeltas(HummockVersionDeltas {
273 version_deltas: deltas
274 .iter()
275 .map(|delta| {
276 FrontendHummockVersionDelta::from_delta(delta).to_protobuf()
277 })
278 .collect(),
279 }),
280 );
281 if let Some(table_committed_epoch_notifiers) = self.table_committed_epoch_notifiers
282 {
283 table_committed_epoch_notifiers
284 .lock()
285 .notify_deltas(&deltas);
286 }
287 }
288
289 for delta in deltas {
290 assert!(self.orig_deltas.insert(delta.id, delta.clone()).is_none());
291 }
292
293 trigger_delta_log_stats(self.meta_metrics, self.orig_deltas.len());
294 trigger_version_stat(self.meta_metrics, self.orig_version);
295 }
296 }
297}
298
299impl<TXN> ValTransaction<TXN> for HummockVersionTransaction<'_>
300where
301 TXN: ConnectionTrait,
302 HummockVersionDelta: Transactional<TXN>,
303 HummockVersionStats: Transactional<TXN>,
304{
305 async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
306 if self.disable_apply_to_txn {
307 return Ok(());
308 }
309 if let Some((_, deltas, gc_change_log_deltas)) = &self.pre_applied_version {
310 for delta in deltas {
312 delta.upsert_in_transaction(txn).await?;
313 }
314
315 let insert_batch_size = self.opts.table_change_log_insert_batch_size as usize;
316 use futures::stream::{self, StreamExt};
317 use sea_orm::{ColumnTrait, Condition, QueryFilter};
318 let insert_iter = deltas
319 .iter()
320 .flat_map(|i| i.change_log_delta.iter())
321 .map(|(table_id, change_log_delta)| (*table_id, &change_log_delta.new_log));
322 let mut stream = stream::iter(insert_iter).chunks(insert_batch_size);
323 while let Some(change_log_batch) = stream.next().await {
324 let insert_many = change_log_batch
325 .into_iter()
326 .map(|(table_id, change_log)| {
327 to_table_change_log_meta_store_model(table_id, change_log)
328 })
329 .collect::<Vec<_>>();
330 risingwave_meta_model::hummock_table_change_log::Entity::insert_many(insert_many)
331 .on_empty_do_nothing()
332 .exec(txn)
333 .await?;
334 }
335
336 let delete_batch_size = self.opts.table_change_log_delete_batch_size as usize;
337 let delete_iter = deltas
338 .iter()
339 .flat_map(|i| i.change_log_delta.iter())
340 .map(|(table_id, change_log_delta)| (*table_id, change_log_delta.truncate_epoch))
341 .chain(
342 gc_change_log_deltas
343 .iter()
344 .map(|table_id| (*table_id, u64::MAX)),
345 );
346
347 let mut stream = stream::iter(delete_iter).chunks(delete_batch_size);
348 while let Some(change_log_batch) = stream.next().await {
349 let mut condition = Condition::any();
350 for (table_id, truncate_epoch) in change_log_batch {
351 condition = condition.add(
352 Condition::all()
353 .add(risingwave_meta_model::hummock_table_change_log::Column::TableId.eq(table_id))
354 .add(risingwave_meta_model::hummock_table_change_log::Column::CheckpointEpoch.lt(truncate_epoch as Epoch))
355 );
356 }
357 risingwave_meta_model::hummock_table_change_log::Entity::delete_many()
358 .filter(condition)
359 .exec(txn)
360 .await?;
361 }
362 }
363 Ok(())
364 }
365}
366
367pub(super) struct SingleDeltaTransaction<'a, 'b> {
368 version_txn: &'b mut HummockVersionTransaction<'a>,
369 delta: Option<HummockVersionDelta>,
370}
371
372impl SingleDeltaTransaction<'_, '_> {
373 pub(super) fn latest_version(&self) -> &HummockVersion {
374 self.version_txn.latest_version()
375 }
376
377 pub(super) fn pre_apply(mut self) {
378 self.version_txn.pre_apply(self.delta.take().unwrap());
379 }
380
381 pub(super) fn with_latest_version(
382 &mut self,
383 f: impl FnOnce(&HummockVersion, &mut HummockVersionDelta),
384 ) {
385 f(
386 self.version_txn.latest_version(),
387 self.delta.as_mut().expect("should exist"),
388 )
389 }
390}
391
392impl Deref for SingleDeltaTransaction<'_, '_> {
393 type Target = HummockVersionDelta;
394
395 fn deref(&self) -> &Self::Target {
396 self.delta.as_ref().expect("should exist")
397 }
398}
399
400impl DerefMut for SingleDeltaTransaction<'_, '_> {
401 fn deref_mut(&mut self) -> &mut Self::Target {
402 self.delta.as_mut().expect("should exist")
403 }
404}
405
406impl Drop for SingleDeltaTransaction<'_, '_> {
407 fn drop(&mut self) {
408 if let Some(delta) = self.delta.take() {
409 self.version_txn.pre_apply(delta);
410 }
411 }
412}
413
414pub(super) struct HummockVersionStatsTransaction<'a> {
415 stats: VarTransaction<'a, HummockVersionStats>,
416 notification_manager: &'a NotificationManager,
417}
418
419impl<'a> HummockVersionStatsTransaction<'a> {
420 pub(super) fn new(
421 stats: &'a mut HummockVersionStats,
422 notification_manager: &'a NotificationManager,
423 ) -> Self {
424 Self {
425 stats: VarTransaction::new(stats),
426 notification_manager,
427 }
428 }
429}
430
431impl InMemValTransaction for HummockVersionStatsTransaction<'_> {
432 fn commit(self) {
433 if self.stats.has_new_value() {
434 let stats = self.stats.clone();
435 self.stats.commit();
436 self.notification_manager
437 .notify_frontend_without_version(Operation::Update, Info::HummockStats(stats));
438 }
439 }
440}
441
442impl<TXN> ValTransaction<TXN> for HummockVersionStatsTransaction<'_>
443where
444 TXN: ConnectionTrait,
445 HummockVersionStats: Transactional<TXN>,
446{
447 async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
448 self.stats.apply_to_txn(txn).await
449 }
450}
451
452impl Deref for HummockVersionStatsTransaction<'_> {
453 type Target = HummockVersionStats;
454
455 fn deref(&self) -> &Self::Target {
456 self.stats.deref()
457 }
458}
459
460impl DerefMut for HummockVersionStatsTransaction<'_> {
461 fn deref_mut(&mut self) -> &mut Self::Target {
462 self.stats.deref_mut()
463 }
464}