risingwave_meta/hummock/manager/
transaction.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::ops::{Deref, DerefMut};
17
18use parking_lot::Mutex;
19use risingwave_common::catalog::TableId;
20use risingwave_hummock_sdk::change_log::ChangeLogDelta;
21use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
22use risingwave_hummock_sdk::sstable_info::SstableInfo;
23use risingwave_hummock_sdk::table_watermark::TableWatermarks;
24use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
25use risingwave_hummock_sdk::version::{GroupDelta, HummockVersion, HummockVersionDelta};
26use risingwave_hummock_sdk::{CompactionGroupId, FrontendHummockVersionDelta, HummockVersionId};
27use risingwave_pb::hummock::{
28    CompatibilityVersion, GroupConstruct, HummockVersionDeltas, HummockVersionStats,
29    StateTableInfoDelta,
30};
31use risingwave_pb::meta::subscribe_response::{Info, Operation};
32
33use super::TableCommittedEpochNotifiers;
34use crate::hummock::model::CompactionGroup;
35use crate::manager::NotificationManager;
36use crate::model::{
37    InMemValTransaction, MetadataModelResult, Transactional, ValTransaction, VarTransaction,
38};
39use crate::rpc::metrics::MetaMetrics;
40
41fn trigger_delta_log_stats(metrics: &MetaMetrics, total_number: usize) {
42    metrics.delta_log_count.set(total_number as _);
43}
44
45fn trigger_version_stat(metrics: &MetaMetrics, current_version: &HummockVersion) {
46    metrics
47        .version_size
48        .set(current_version.estimated_encode_len() as i64);
49    metrics
50        .current_version_id
51        .set(current_version.id.to_u64() as i64);
52}
53
54pub(super) struct HummockVersionTransaction<'a> {
55    orig_version: &'a mut HummockVersion,
56    orig_deltas: &'a mut BTreeMap<HummockVersionId, HummockVersionDelta>,
57    notification_manager: &'a NotificationManager,
58    table_committed_epoch_notifiers: Option<&'a Mutex<TableCommittedEpochNotifiers>>,
59    meta_metrics: &'a MetaMetrics,
60
61    pre_applied_version: Option<(HummockVersion, Vec<HummockVersionDelta>)>,
62    disable_apply_to_txn: bool,
63}
64
65impl<'a> HummockVersionTransaction<'a> {
66    pub(super) fn new(
67        version: &'a mut HummockVersion,
68        deltas: &'a mut BTreeMap<HummockVersionId, HummockVersionDelta>,
69        notification_manager: &'a NotificationManager,
70        table_committed_epoch_notifiers: Option<&'a Mutex<TableCommittedEpochNotifiers>>,
71        meta_metrics: &'a MetaMetrics,
72    ) -> Self {
73        Self {
74            orig_version: version,
75            orig_deltas: deltas,
76            pre_applied_version: None,
77            disable_apply_to_txn: false,
78            notification_manager,
79            table_committed_epoch_notifiers,
80            meta_metrics,
81        }
82    }
83
84    pub(super) fn disable_apply_to_txn(&mut self) {
85        assert!(
86            self.pre_applied_version.is_none(),
87            "should only call disable at the beginning of txn"
88        );
89        self.disable_apply_to_txn = true;
90    }
91
92    pub(super) fn latest_version(&self) -> &HummockVersion {
93        if let Some((version, _)) = &self.pre_applied_version {
94            version
95        } else {
96            self.orig_version
97        }
98    }
99
100    pub(super) fn new_delta<'b>(&'b mut self) -> SingleDeltaTransaction<'a, 'b> {
101        let delta = self.latest_version().version_delta_after();
102        SingleDeltaTransaction {
103            version_txn: self,
104            delta: Some(delta),
105        }
106    }
107
108    fn pre_apply(&mut self, delta: HummockVersionDelta) {
109        let (version, deltas) = self
110            .pre_applied_version
111            .get_or_insert_with(|| (self.orig_version.clone(), Vec::with_capacity(1)));
112        version.apply_version_delta(&delta);
113        deltas.push(delta);
114    }
115
116    /// Returns a duplicate delta, used by time travel.
117    pub(super) fn pre_commit_epoch(
118        &mut self,
119        tables_to_commit: &HashMap<TableId, u64>,
120        new_compaction_groups: Vec<CompactionGroup>,
121        group_id_to_sub_levels: BTreeMap<CompactionGroupId, Vec<Vec<SstableInfo>>>,
122        new_table_ids: &HashMap<TableId, CompactionGroupId>,
123        new_table_watermarks: HashMap<TableId, TableWatermarks>,
124        change_log_delta: HashMap<TableId, ChangeLogDelta>,
125        vector_index_delta: HashMap<TableId, VectorIndexDelta>,
126        group_id_to_truncate_tables: HashMap<CompactionGroupId, Vec<TableId>>,
127    ) -> HummockVersionDelta {
128        let mut new_version_delta = self.new_delta();
129        new_version_delta.new_table_watermarks = new_table_watermarks;
130        new_version_delta.change_log_delta = change_log_delta;
131        new_version_delta.vector_index_delta = vector_index_delta;
132
133        for compaction_group in &new_compaction_groups {
134            let group_deltas = &mut new_version_delta
135                .group_deltas
136                .entry(compaction_group.group_id())
137                .or_default()
138                .group_deltas;
139
140            #[expect(deprecated)]
141            group_deltas.push(GroupDelta::GroupConstruct(Box::new(GroupConstruct {
142                group_config: Some(compaction_group.compaction_config().as_ref().clone()),
143                group_id: compaction_group.group_id(),
144                parent_group_id: StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId,
145                new_sst_start_id: 0, // No need to set it when `NewCompactionGroup`
146                table_ids: vec![],
147                version: CompatibilityVersion::LATEST as _,
148                split_key: None,
149            })));
150        }
151
152        // Append SSTs to a new version.
153        for (compaction_group_id, sub_levels) in group_id_to_sub_levels {
154            let group_deltas = &mut new_version_delta
155                .group_deltas
156                .entry(compaction_group_id)
157                .or_default()
158                .group_deltas;
159
160            for sub_level in sub_levels {
161                group_deltas.push(GroupDelta::NewL0SubLevel(sub_level));
162            }
163        }
164
165        for (compaction_group_id, table_ids) in group_id_to_truncate_tables {
166            let group_deltas = &mut new_version_delta
167                .group_deltas
168                .entry(compaction_group_id)
169                .or_default()
170                .group_deltas;
171
172            group_deltas.push(GroupDelta::TruncateTables(
173                table_ids.into_iter().map(|id| id.into()).collect(),
174            ));
175        }
176
177        // update state table info
178        new_version_delta.with_latest_version(|version, delta| {
179            for (table_id, cg_id) in new_table_ids {
180                assert!(
181                    !version.state_table_info.info().contains_key(table_id),
182                    "newly added table exists previously: {:?}",
183                    table_id
184                );
185                let committed_epoch = *tables_to_commit.get(table_id).expect("newly added table must exist in tables_to_commit");
186                delta.state_table_info_delta.insert(
187                    *table_id,
188                    StateTableInfoDelta {
189                        committed_epoch,
190                        compaction_group_id: *cg_id,
191                    },
192                );
193            }
194
195            for (table_id, committed_epoch) in tables_to_commit {
196                if new_table_ids.contains_key(table_id) {
197                    continue;
198                }
199                let info = version.state_table_info.info().get(table_id).unwrap_or_else(|| {
200                    panic!("tables_to_commit {:?} contains table_id {} that is not newly added but not exists previously", tables_to_commit, table_id);
201                });
202                assert!(delta
203                    .state_table_info_delta
204                    .insert(
205                        *table_id,
206                        StateTableInfoDelta {
207                            committed_epoch: *committed_epoch,
208                            compaction_group_id: info.compaction_group_id,
209                        }
210                    )
211                    .is_none());
212            }
213        });
214
215        let time_travel_delta = (*new_version_delta).clone();
216        new_version_delta.pre_apply();
217        time_travel_delta
218    }
219}
220
221impl InMemValTransaction for HummockVersionTransaction<'_> {
222    fn commit(self) {
223        if let Some((version, deltas)) = self.pre_applied_version {
224            *self.orig_version = version;
225            if !self.disable_apply_to_txn {
226                let pb_deltas = deltas.iter().map(|delta| delta.to_protobuf()).collect();
227                self.notification_manager.notify_hummock_without_version(
228                    Operation::Add,
229                    Info::HummockVersionDeltas(risingwave_pb::hummock::HummockVersionDeltas {
230                        version_deltas: pb_deltas,
231                    }),
232                );
233                self.notification_manager.notify_frontend_without_version(
234                    Operation::Update,
235                    Info::HummockVersionDeltas(HummockVersionDeltas {
236                        version_deltas: deltas
237                            .iter()
238                            .map(|delta| {
239                                FrontendHummockVersionDelta::from_delta(delta).to_protobuf()
240                            })
241                            .collect(),
242                    }),
243                );
244                if let Some(table_committed_epoch_notifiers) = self.table_committed_epoch_notifiers
245                {
246                    table_committed_epoch_notifiers
247                        .lock()
248                        .notify_deltas(&deltas);
249                }
250            }
251            for delta in deltas {
252                assert!(self.orig_deltas.insert(delta.id, delta.clone()).is_none());
253            }
254
255            trigger_delta_log_stats(self.meta_metrics, self.orig_deltas.len());
256            trigger_version_stat(self.meta_metrics, self.orig_version);
257        }
258    }
259}
260
261impl<TXN> ValTransaction<TXN> for HummockVersionTransaction<'_>
262where
263    HummockVersionDelta: Transactional<TXN>,
264    HummockVersionStats: Transactional<TXN>,
265{
266    async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
267        if self.disable_apply_to_txn {
268            return Ok(());
269        }
270        for delta in self
271            .pre_applied_version
272            .iter()
273            .flat_map(|(_, deltas)| deltas.iter())
274        {
275            delta.upsert_in_transaction(txn).await?;
276        }
277        Ok(())
278    }
279}
280
281pub(super) struct SingleDeltaTransaction<'a, 'b> {
282    version_txn: &'b mut HummockVersionTransaction<'a>,
283    delta: Option<HummockVersionDelta>,
284}
285
286impl SingleDeltaTransaction<'_, '_> {
287    pub(super) fn latest_version(&self) -> &HummockVersion {
288        self.version_txn.latest_version()
289    }
290
291    pub(super) fn pre_apply(mut self) {
292        self.version_txn.pre_apply(self.delta.take().unwrap());
293    }
294
295    pub(super) fn with_latest_version(
296        &mut self,
297        f: impl FnOnce(&HummockVersion, &mut HummockVersionDelta),
298    ) {
299        f(
300            self.version_txn.latest_version(),
301            self.delta.as_mut().expect("should exist"),
302        )
303    }
304}
305
306impl Deref for SingleDeltaTransaction<'_, '_> {
307    type Target = HummockVersionDelta;
308
309    fn deref(&self) -> &Self::Target {
310        self.delta.as_ref().expect("should exist")
311    }
312}
313
314impl DerefMut for SingleDeltaTransaction<'_, '_> {
315    fn deref_mut(&mut self) -> &mut Self::Target {
316        self.delta.as_mut().expect("should exist")
317    }
318}
319
320impl Drop for SingleDeltaTransaction<'_, '_> {
321    fn drop(&mut self) {
322        if let Some(delta) = self.delta.take() {
323            self.version_txn.pre_apply(delta);
324        }
325    }
326}
327
328pub(super) struct HummockVersionStatsTransaction<'a> {
329    stats: VarTransaction<'a, HummockVersionStats>,
330    notification_manager: &'a NotificationManager,
331}
332
333impl<'a> HummockVersionStatsTransaction<'a> {
334    pub(super) fn new(
335        stats: &'a mut HummockVersionStats,
336        notification_manager: &'a NotificationManager,
337    ) -> Self {
338        Self {
339            stats: VarTransaction::new(stats),
340            notification_manager,
341        }
342    }
343}
344
345impl InMemValTransaction for HummockVersionStatsTransaction<'_> {
346    fn commit(self) {
347        if self.stats.has_new_value() {
348            let stats = self.stats.clone();
349            self.stats.commit();
350            self.notification_manager
351                .notify_frontend_without_version(Operation::Update, Info::HummockStats(stats));
352        }
353    }
354}
355
356impl<TXN> ValTransaction<TXN> for HummockVersionStatsTransaction<'_>
357where
358    HummockVersionStats: Transactional<TXN>,
359{
360    async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
361        self.stats.apply_to_txn(txn).await
362    }
363}
364
365impl Deref for HummockVersionStatsTransaction<'_> {
366    type Target = HummockVersionStats;
367
368    fn deref(&self) -> &Self::Target {
369        self.stats.deref()
370    }
371}
372
373impl DerefMut for HummockVersionStatsTransaction<'_> {
374    fn deref_mut(&mut self) -> &mut Self::Target {
375        self.stats.deref_mut()
376    }
377}