1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::ops::{Deref, DerefMut};
17
18use parking_lot::Mutex;
19use risingwave_common::catalog::TableId;
20use risingwave_hummock_sdk::change_log::{ChangeLogDelta, TableChangeLog};
21use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
22use risingwave_hummock_sdk::sstable_info::SstableInfo;
23use risingwave_hummock_sdk::table_watermark::TableWatermarks;
24use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
25use risingwave_hummock_sdk::version::{GroupDelta, HummockVersion, HummockVersionDelta};
26use risingwave_hummock_sdk::{
27 CompactionGroupId, FrontendHummockVersionDelta, HummockSstableId, HummockVersionId,
28};
29use risingwave_meta_model::Epoch;
30use risingwave_pb::hummock::{
31 CompatibilityVersion, GroupConstruct, HummockVersionDeltas, HummockVersionStats,
32 StateTableInfoDelta,
33};
34use risingwave_pb::meta::subscribe_response::{Info, Operation};
35use sea_orm::{ConnectionTrait, EntityTrait};
36
37use super::TableCommittedEpochNotifiers;
38use crate::hummock::model::CompactionGroup;
39use crate::hummock::model::ext::to_table_change_log_meta_store_model;
40use crate::manager::{MetaOpts, NotificationManager};
41use crate::model::{
42 InMemValTransaction, MetadataModelResult, Transactional, ValTransaction, VarTransaction,
43};
44use crate::rpc::metrics::MetaMetrics;
45
46fn trigger_delta_log_stats(metrics: &MetaMetrics, total_number: usize) {
47 metrics.delta_log_count.set(total_number as _);
48}
49
50fn trigger_version_stat(metrics: &MetaMetrics, current_version: &HummockVersion) {
51 metrics
52 .version_size
53 .set(current_version.estimated_encode_len() as i64);
54 metrics
55 .current_version_id
56 .set(current_version.id.as_i64_id());
57}
58
59pub(super) struct HummockVersionTransaction<'a> {
60 orig_version: &'a mut HummockVersion,
61 orig_deltas: &'a mut BTreeMap<HummockVersionId, HummockVersionDelta>,
62 orig_table_change_log: &'a mut HashMap<TableId, TableChangeLog>,
63 notification_manager: &'a NotificationManager,
64 table_committed_epoch_notifiers: Option<&'a Mutex<TableCommittedEpochNotifiers>>,
65 meta_metrics: &'a MetaMetrics,
66
67 pre_applied_version: Option<(HummockVersion, Vec<HummockVersionDelta>, HashSet<TableId>)>,
68 disable_apply_to_txn: bool,
69 opts: &'a MetaOpts,
70}
71
72impl<'a> HummockVersionTransaction<'a> {
73 pub(super) fn new(
74 version: &'a mut HummockVersion,
75 deltas: &'a mut BTreeMap<HummockVersionId, HummockVersionDelta>,
76 table_change_log: &'a mut HashMap<TableId, TableChangeLog>,
77 notification_manager: &'a NotificationManager,
78 table_committed_epoch_notifiers: Option<&'a Mutex<TableCommittedEpochNotifiers>>,
79 meta_metrics: &'a MetaMetrics,
80 opts: &'a MetaOpts,
81 ) -> Self {
82 Self {
83 orig_version: version,
84 orig_deltas: deltas,
85 orig_table_change_log: table_change_log,
86 pre_applied_version: None,
87 disable_apply_to_txn: false,
88 notification_manager,
89 table_committed_epoch_notifiers,
90 meta_metrics,
91 opts,
92 }
93 }
94
95 pub(super) fn disable_apply_to_txn(&mut self) {
96 assert!(
97 self.pre_applied_version.is_none(),
98 "should only call disable at the beginning of txn"
99 );
100 self.disable_apply_to_txn = true;
101 }
102
103 pub(super) fn latest_version(&self) -> &HummockVersion {
104 if let Some((version, _, _)) = &self.pre_applied_version {
105 version
106 } else {
107 self.orig_version
108 }
109 }
110
111 pub(super) fn new_delta<'b>(&'b mut self) -> SingleDeltaTransaction<'a, 'b> {
112 let delta = self.latest_version().version_delta_after();
113 SingleDeltaTransaction {
114 version_txn: self,
115 delta: Some(delta),
116 }
117 }
118
119 fn pre_apply(&mut self, delta: HummockVersionDelta) {
120 let (version, deltas, gc_change_log_deltas) =
121 self.pre_applied_version.get_or_insert_with(|| {
122 (
123 self.orig_version.clone(),
124 Vec::with_capacity(1),
125 HashSet::new(),
126 )
127 });
128 let changed_table_info = version.apply_version_delta(&delta);
129 let gc_change_log_delta = HummockVersion::collect_gc_change_log_delta(
134 self.orig_table_change_log.keys(),
135 &delta.change_log_delta,
136 &delta.removed_table_ids,
137 &delta.state_table_info_delta,
138 &changed_table_info,
139 );
140 gc_change_log_deltas.extend(gc_change_log_delta);
141 deltas.push(delta);
142 }
143
144 pub(super) fn pre_commit_epoch(
146 &mut self,
147 tables_to_commit: &HashMap<TableId, u64>,
148 new_compaction_groups: Vec<CompactionGroup>,
149 group_id_to_sub_levels: BTreeMap<CompactionGroupId, Vec<Vec<SstableInfo>>>,
150 new_table_ids: &HashMap<TableId, CompactionGroupId>,
151 new_table_watermarks: HashMap<TableId, TableWatermarks>,
152 change_log_delta: HashMap<TableId, ChangeLogDelta>,
153 vector_index_delta: HashMap<TableId, VectorIndexDelta>,
154 group_id_to_truncate_tables: HashMap<CompactionGroupId, HashSet<TableId>>,
155 ) -> HummockVersionDelta {
156 let mut new_version_delta = self.new_delta();
157 new_version_delta.new_table_watermarks = new_table_watermarks;
158 new_version_delta.change_log_delta = change_log_delta;
159 new_version_delta.vector_index_delta = vector_index_delta;
160
161 for compaction_group in &new_compaction_groups {
162 let group_deltas = &mut new_version_delta
163 .group_deltas
164 .entry(compaction_group.group_id())
165 .or_default()
166 .group_deltas;
167
168 #[expect(deprecated)]
169 group_deltas.push(GroupDelta::GroupConstruct(Box::new(GroupConstruct {
170 group_config: Some(compaction_group.compaction_config().as_ref().clone()),
171 group_id: compaction_group.group_id(),
172 parent_group_id: StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId,
173 new_sst_start_id: HummockSstableId::default(), table_ids: vec![],
175 version: CompatibilityVersion::LATEST as _,
176 split_key: None,
177 })));
178 }
179
180 for (compaction_group_id, sub_levels) in group_id_to_sub_levels {
182 let group_deltas = &mut new_version_delta
183 .group_deltas
184 .entry(compaction_group_id)
185 .or_default()
186 .group_deltas;
187
188 for sub_level in sub_levels {
189 group_deltas.push(GroupDelta::NewL0SubLevel(sub_level));
190 }
191 }
192
193 for (compaction_group_id, table_ids) in group_id_to_truncate_tables {
194 let group_deltas = &mut new_version_delta
195 .group_deltas
196 .entry(compaction_group_id)
197 .or_default()
198 .group_deltas;
199
200 group_deltas.push(GroupDelta::TruncateTables(table_ids.into_iter().collect()));
201 }
202
203 new_version_delta.with_latest_version(|version, delta| {
205 for (table_id, cg_id) in new_table_ids {
206 assert!(
207 !version.state_table_info.info().contains_key(table_id),
208 "newly added table exists previously: {:?}",
209 table_id
210 );
211 let committed_epoch = *tables_to_commit.get(table_id).expect("newly added table must exist in tables_to_commit");
212 delta.state_table_info_delta.insert(
213 *table_id,
214 StateTableInfoDelta {
215 committed_epoch,
216 compaction_group_id: *cg_id,
217 },
218 );
219 }
220
221 for (table_id, committed_epoch) in tables_to_commit {
222 if new_table_ids.contains_key(table_id) {
223 continue;
224 }
225 let info = version.state_table_info.info().get(table_id).unwrap_or_else(|| {
226 panic!("tables_to_commit {:?} contains table_id {} that is not newly added but not exists previously", tables_to_commit, table_id);
227 });
228 assert!(delta
229 .state_table_info_delta
230 .insert(
231 *table_id,
232 StateTableInfoDelta {
233 committed_epoch: *committed_epoch,
234 compaction_group_id: info.compaction_group_id,
235 }
236 )
237 .is_none());
238 }
239 });
240
241 let time_travel_delta = (*new_version_delta).clone();
242 new_version_delta.pre_apply();
243 time_travel_delta
244 }
245}
246
247impl InMemValTransaction for HummockVersionTransaction<'_> {
248 fn commit(self) {
249 if let Some((version, deltas, gc_change_log_deltas)) = self.pre_applied_version {
250 *self.orig_version = version;
251 for delta in &deltas {
252 HummockVersion::apply_change_log_delta(
253 self.orig_table_change_log,
254 &delta.change_log_delta,
255 );
256 }
257 self.orig_table_change_log
258 .retain(|table_id, _| !gc_change_log_deltas.contains(table_id));
259
260 if !self.disable_apply_to_txn {
261 let pb_deltas = deltas.iter().map(|delta| delta.to_protobuf()).collect();
262 self.notification_manager.notify_hummock_without_version(
263 Operation::Add,
264 Info::HummockVersionDeltas(risingwave_pb::hummock::HummockVersionDeltas {
265 version_deltas: pb_deltas,
266 }),
267 );
268 self.notification_manager.notify_frontend_without_version(
269 Operation::Update,
270 Info::HummockVersionDeltas(HummockVersionDeltas {
271 version_deltas: deltas
272 .iter()
273 .map(|delta| {
274 FrontendHummockVersionDelta::from_delta(delta).to_protobuf()
275 })
276 .collect(),
277 }),
278 );
279 if let Some(table_committed_epoch_notifiers) = self.table_committed_epoch_notifiers
280 {
281 table_committed_epoch_notifiers
282 .lock()
283 .notify_deltas(&deltas);
284 }
285 }
286
287 for delta in deltas {
288 assert!(self.orig_deltas.insert(delta.id, delta.clone()).is_none());
289 }
290
291 trigger_delta_log_stats(self.meta_metrics, self.orig_deltas.len());
292 trigger_version_stat(self.meta_metrics, self.orig_version);
293 }
294 }
295}
296
297impl<TXN> ValTransaction<TXN> for HummockVersionTransaction<'_>
298where
299 TXN: ConnectionTrait,
300 HummockVersionDelta: Transactional<TXN>,
301 HummockVersionStats: Transactional<TXN>,
302{
303 async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
304 if self.disable_apply_to_txn {
305 return Ok(());
306 }
307 if let Some((_, deltas, gc_change_log_deltas)) = &self.pre_applied_version {
308 for delta in deltas {
310 delta.upsert_in_transaction(txn).await?;
311 }
312
313 let insert_batch_size = self.opts.table_change_log_insert_batch_size as usize;
314 use futures::stream::{self, StreamExt};
315 use sea_orm::{ColumnTrait, Condition, QueryFilter};
316 let insert_iter = deltas
317 .iter()
318 .flat_map(|i| i.change_log_delta.iter())
319 .map(|(table_id, change_log_delta)| (*table_id, &change_log_delta.new_log));
320 let mut stream = stream::iter(insert_iter).chunks(insert_batch_size);
321 while let Some(change_log_batch) = stream.next().await {
322 let insert_many = change_log_batch
323 .into_iter()
324 .map(|(table_id, change_log)| {
325 to_table_change_log_meta_store_model(table_id, change_log)
326 })
327 .collect::<Vec<_>>();
328 risingwave_meta_model::hummock_table_change_log::Entity::insert_many(insert_many)
329 .on_empty_do_nothing()
330 .exec(txn)
331 .await?;
332 }
333
334 let delete_batch_size = self.opts.table_change_log_delete_batch_size as usize;
335 let delete_iter = deltas
336 .iter()
337 .flat_map(|i| i.change_log_delta.iter())
338 .map(|(table_id, change_log_delta)| (*table_id, change_log_delta.truncate_epoch))
339 .chain(
340 gc_change_log_deltas
341 .iter()
342 .map(|table_id| (*table_id, u64::MAX)),
343 );
344
345 let mut stream = stream::iter(delete_iter).chunks(delete_batch_size);
346 while let Some(change_log_batch) = stream.next().await {
347 let mut condition = Condition::any();
348 for (table_id, truncate_epoch) in change_log_batch {
349 condition = condition.add(
350 Condition::all()
351 .add(risingwave_meta_model::hummock_table_change_log::Column::TableId.eq(table_id))
352 .add(risingwave_meta_model::hummock_table_change_log::Column::CheckpointEpoch.lt(truncate_epoch as Epoch))
353 );
354 }
355 risingwave_meta_model::hummock_table_change_log::Entity::delete_many()
356 .filter(condition)
357 .exec(txn)
358 .await?;
359 }
360 }
361 Ok(())
362 }
363}
364
365pub(super) struct SingleDeltaTransaction<'a, 'b> {
366 version_txn: &'b mut HummockVersionTransaction<'a>,
367 delta: Option<HummockVersionDelta>,
368}
369
370impl SingleDeltaTransaction<'_, '_> {
371 pub(super) fn latest_version(&self) -> &HummockVersion {
372 self.version_txn.latest_version()
373 }
374
375 pub(super) fn pre_apply(mut self) {
376 self.version_txn.pre_apply(self.delta.take().unwrap());
377 }
378
379 pub(super) fn with_latest_version(
380 &mut self,
381 f: impl FnOnce(&HummockVersion, &mut HummockVersionDelta),
382 ) {
383 f(
384 self.version_txn.latest_version(),
385 self.delta.as_mut().expect("should exist"),
386 )
387 }
388}
389
390impl Deref for SingleDeltaTransaction<'_, '_> {
391 type Target = HummockVersionDelta;
392
393 fn deref(&self) -> &Self::Target {
394 self.delta.as_ref().expect("should exist")
395 }
396}
397
398impl DerefMut for SingleDeltaTransaction<'_, '_> {
399 fn deref_mut(&mut self) -> &mut Self::Target {
400 self.delta.as_mut().expect("should exist")
401 }
402}
403
404impl Drop for SingleDeltaTransaction<'_, '_> {
405 fn drop(&mut self) {
406 if let Some(delta) = self.delta.take() {
407 self.version_txn.pre_apply(delta);
408 }
409 }
410}
411
412pub(super) struct HummockVersionStatsTransaction<'a> {
413 stats: VarTransaction<'a, HummockVersionStats>,
414 notification_manager: &'a NotificationManager,
415}
416
417impl<'a> HummockVersionStatsTransaction<'a> {
418 pub(super) fn new(
419 stats: &'a mut HummockVersionStats,
420 notification_manager: &'a NotificationManager,
421 ) -> Self {
422 Self {
423 stats: VarTransaction::new(stats),
424 notification_manager,
425 }
426 }
427}
428
429impl InMemValTransaction for HummockVersionStatsTransaction<'_> {
430 fn commit(self) {
431 if self.stats.has_new_value() {
432 let stats = self.stats.clone();
433 self.stats.commit();
434 self.notification_manager
435 .notify_frontend_without_version(Operation::Update, Info::HummockStats(stats));
436 }
437 }
438}
439
440impl<TXN> ValTransaction<TXN> for HummockVersionStatsTransaction<'_>
441where
442 TXN: ConnectionTrait,
443 HummockVersionStats: Transactional<TXN>,
444{
445 async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
446 self.stats.apply_to_txn(txn).await
447 }
448}
449
450impl Deref for HummockVersionStatsTransaction<'_> {
451 type Target = HummockVersionStats;
452
453 fn deref(&self) -> &Self::Target {
454 self.stats.deref()
455 }
456}
457
458impl DerefMut for HummockVersionStatsTransaction<'_> {
459 fn deref_mut(&mut self) -> &mut Self::Target {
460 self.stats.deref_mut()
461 }
462}