risingwave_meta/hummock/manager/
transaction.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::ops::{Deref, DerefMut};
17
18use parking_lot::Mutex;
19use risingwave_common::catalog::TableId;
20use risingwave_hummock_sdk::change_log::ChangeLogDelta;
21use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
22use risingwave_hummock_sdk::sstable_info::SstableInfo;
23use risingwave_hummock_sdk::table_watermark::TableWatermarks;
24use risingwave_hummock_sdk::version::{GroupDelta, HummockVersion, HummockVersionDelta};
25use risingwave_hummock_sdk::{CompactionGroupId, FrontendHummockVersionDelta, HummockVersionId};
26use risingwave_pb::hummock::{
27    CompatibilityVersion, GroupConstruct, HummockVersionDeltas, HummockVersionStats,
28    StateTableInfoDelta,
29};
30use risingwave_pb::meta::subscribe_response::{Info, Operation};
31
32use super::TableCommittedEpochNotifiers;
33use crate::hummock::model::CompactionGroup;
34use crate::manager::NotificationManager;
35use crate::model::{
36    InMemValTransaction, MetadataModelResult, Transactional, ValTransaction, VarTransaction,
37};
38use crate::rpc::metrics::MetaMetrics;
39
40fn trigger_delta_log_stats(metrics: &MetaMetrics, total_number: usize) {
41    metrics.delta_log_count.set(total_number as _);
42}
43
44fn trigger_version_stat(metrics: &MetaMetrics, current_version: &HummockVersion) {
45    metrics
46        .version_size
47        .set(current_version.estimated_encode_len() as i64);
48    metrics
49        .current_version_id
50        .set(current_version.id.to_u64() as i64);
51}
52
53pub(super) struct HummockVersionTransaction<'a> {
54    orig_version: &'a mut HummockVersion,
55    orig_deltas: &'a mut BTreeMap<HummockVersionId, HummockVersionDelta>,
56    notification_manager: &'a NotificationManager,
57    table_committed_epoch_notifiers: Option<&'a Mutex<TableCommittedEpochNotifiers>>,
58    meta_metrics: &'a MetaMetrics,
59
60    pre_applied_version: Option<(HummockVersion, Vec<HummockVersionDelta>)>,
61    disable_apply_to_txn: bool,
62}
63
64impl<'a> HummockVersionTransaction<'a> {
65    pub(super) fn new(
66        version: &'a mut HummockVersion,
67        deltas: &'a mut BTreeMap<HummockVersionId, HummockVersionDelta>,
68        notification_manager: &'a NotificationManager,
69        table_committed_epoch_notifiers: Option<&'a Mutex<TableCommittedEpochNotifiers>>,
70        meta_metrics: &'a MetaMetrics,
71    ) -> Self {
72        Self {
73            orig_version: version,
74            orig_deltas: deltas,
75            pre_applied_version: None,
76            disable_apply_to_txn: false,
77            notification_manager,
78            table_committed_epoch_notifiers,
79            meta_metrics,
80        }
81    }
82
83    pub(super) fn disable_apply_to_txn(&mut self) {
84        assert!(
85            self.pre_applied_version.is_none(),
86            "should only call disable at the beginning of txn"
87        );
88        self.disable_apply_to_txn = true;
89    }
90
91    pub(super) fn latest_version(&self) -> &HummockVersion {
92        if let Some((version, _)) = &self.pre_applied_version {
93            version
94        } else {
95            self.orig_version
96        }
97    }
98
99    pub(super) fn new_delta<'b>(&'b mut self) -> SingleDeltaTransaction<'a, 'b> {
100        let delta = self.latest_version().version_delta_after();
101        SingleDeltaTransaction {
102            version_txn: self,
103            delta: Some(delta),
104        }
105    }
106
107    fn pre_apply(&mut self, delta: HummockVersionDelta) {
108        let (version, deltas) = self
109            .pre_applied_version
110            .get_or_insert_with(|| (self.orig_version.clone(), Vec::with_capacity(1)));
111        version.apply_version_delta(&delta);
112        deltas.push(delta);
113    }
114
115    /// Returns a duplicate delta, used by time travel.
116    pub(super) fn pre_commit_epoch(
117        &mut self,
118        tables_to_commit: &HashMap<TableId, u64>,
119        new_compaction_groups: Vec<CompactionGroup>,
120        group_id_to_sub_levels: BTreeMap<CompactionGroupId, Vec<Vec<SstableInfo>>>,
121        new_table_ids: &HashMap<TableId, CompactionGroupId>,
122        new_table_watermarks: HashMap<TableId, TableWatermarks>,
123        change_log_delta: HashMap<TableId, ChangeLogDelta>,
124    ) -> HummockVersionDelta {
125        let mut new_version_delta = self.new_delta();
126        new_version_delta.new_table_watermarks = new_table_watermarks;
127        new_version_delta.change_log_delta = change_log_delta;
128
129        for compaction_group in &new_compaction_groups {
130            let group_deltas = &mut new_version_delta
131                .group_deltas
132                .entry(compaction_group.group_id())
133                .or_default()
134                .group_deltas;
135
136            #[expect(deprecated)]
137            group_deltas.push(GroupDelta::GroupConstruct(Box::new(GroupConstruct {
138                group_config: Some(compaction_group.compaction_config().as_ref().clone()),
139                group_id: compaction_group.group_id(),
140                parent_group_id: StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId,
141                new_sst_start_id: 0, // No need to set it when `NewCompactionGroup`
142                table_ids: vec![],
143                version: CompatibilityVersion::LATEST as _,
144                split_key: None,
145            })));
146        }
147
148        // Append SSTs to a new version.
149        for (compaction_group_id, sub_levels) in group_id_to_sub_levels {
150            let group_deltas = &mut new_version_delta
151                .group_deltas
152                .entry(compaction_group_id)
153                .or_default()
154                .group_deltas;
155
156            for sub_level in sub_levels {
157                group_deltas.push(GroupDelta::NewL0SubLevel(sub_level));
158            }
159        }
160
161        // update state table info
162        new_version_delta.with_latest_version(|version, delta| {
163            for (table_id, cg_id) in new_table_ids {
164                assert!(
165                    !version.state_table_info.info().contains_key(table_id),
166                    "newly added table exists previously: {:?}",
167                    table_id
168                );
169                let committed_epoch = *tables_to_commit.get(table_id).expect("newly added table must exist in tables_to_commit");
170                delta.state_table_info_delta.insert(
171                    *table_id,
172                    StateTableInfoDelta {
173                        committed_epoch,
174                        compaction_group_id: *cg_id,
175                    },
176                );
177            }
178
179            for (table_id, committed_epoch) in tables_to_commit {
180                if new_table_ids.contains_key(table_id) {
181                    continue;
182                }
183                let info = version.state_table_info.info().get(table_id).unwrap_or_else(|| {
184                    panic!("tables_to_commit {:?} contains table_id {} that is not newly added but not exists previously", tables_to_commit, table_id);
185                });
186                assert!(delta
187                    .state_table_info_delta
188                    .insert(
189                        *table_id,
190                        StateTableInfoDelta {
191                            committed_epoch: *committed_epoch,
192                            compaction_group_id: info.compaction_group_id,
193                        }
194                    )
195                    .is_none());
196            }
197        });
198
199        let time_travel_delta = (*new_version_delta).clone();
200        new_version_delta.pre_apply();
201        time_travel_delta
202    }
203}
204
205impl InMemValTransaction for HummockVersionTransaction<'_> {
206    fn commit(self) {
207        if let Some((version, deltas)) = self.pre_applied_version {
208            *self.orig_version = version;
209            if !self.disable_apply_to_txn {
210                let pb_deltas = deltas.iter().map(|delta| delta.to_protobuf()).collect();
211                self.notification_manager.notify_hummock_without_version(
212                    Operation::Add,
213                    Info::HummockVersionDeltas(risingwave_pb::hummock::HummockVersionDeltas {
214                        version_deltas: pb_deltas,
215                    }),
216                );
217                self.notification_manager.notify_frontend_without_version(
218                    Operation::Update,
219                    Info::HummockVersionDeltas(HummockVersionDeltas {
220                        version_deltas: deltas
221                            .iter()
222                            .map(|delta| {
223                                FrontendHummockVersionDelta::from_delta(delta).to_protobuf()
224                            })
225                            .collect(),
226                    }),
227                );
228                if let Some(table_committed_epoch_notifiers) = self.table_committed_epoch_notifiers
229                {
230                    table_committed_epoch_notifiers
231                        .lock()
232                        .notify_deltas(&deltas);
233                }
234            }
235            for delta in deltas {
236                assert!(self.orig_deltas.insert(delta.id, delta.clone()).is_none());
237            }
238
239            trigger_delta_log_stats(self.meta_metrics, self.orig_deltas.len());
240            trigger_version_stat(self.meta_metrics, self.orig_version);
241        }
242    }
243}
244
245impl<TXN> ValTransaction<TXN> for HummockVersionTransaction<'_>
246where
247    HummockVersionDelta: Transactional<TXN>,
248    HummockVersionStats: Transactional<TXN>,
249{
250    async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
251        if self.disable_apply_to_txn {
252            return Ok(());
253        }
254        for delta in self
255            .pre_applied_version
256            .iter()
257            .flat_map(|(_, deltas)| deltas.iter())
258        {
259            delta.upsert_in_transaction(txn).await?;
260        }
261        Ok(())
262    }
263}
264
265pub(super) struct SingleDeltaTransaction<'a, 'b> {
266    version_txn: &'b mut HummockVersionTransaction<'a>,
267    delta: Option<HummockVersionDelta>,
268}
269
270impl SingleDeltaTransaction<'_, '_> {
271    pub(super) fn latest_version(&self) -> &HummockVersion {
272        self.version_txn.latest_version()
273    }
274
275    pub(super) fn pre_apply(mut self) {
276        self.version_txn.pre_apply(self.delta.take().unwrap());
277    }
278
279    pub(super) fn with_latest_version(
280        &mut self,
281        f: impl FnOnce(&HummockVersion, &mut HummockVersionDelta),
282    ) {
283        f(
284            self.version_txn.latest_version(),
285            self.delta.as_mut().expect("should exist"),
286        )
287    }
288}
289
290impl Deref for SingleDeltaTransaction<'_, '_> {
291    type Target = HummockVersionDelta;
292
293    fn deref(&self) -> &Self::Target {
294        self.delta.as_ref().expect("should exist")
295    }
296}
297
298impl DerefMut for SingleDeltaTransaction<'_, '_> {
299    fn deref_mut(&mut self) -> &mut Self::Target {
300        self.delta.as_mut().expect("should exist")
301    }
302}
303
304impl Drop for SingleDeltaTransaction<'_, '_> {
305    fn drop(&mut self) {
306        if let Some(delta) = self.delta.take() {
307            self.version_txn.pre_apply(delta);
308        }
309    }
310}
311
312pub(super) struct HummockVersionStatsTransaction<'a> {
313    stats: VarTransaction<'a, HummockVersionStats>,
314    notification_manager: &'a NotificationManager,
315}
316
317impl<'a> HummockVersionStatsTransaction<'a> {
318    pub(super) fn new(
319        stats: &'a mut HummockVersionStats,
320        notification_manager: &'a NotificationManager,
321    ) -> Self {
322        Self {
323            stats: VarTransaction::new(stats),
324            notification_manager,
325        }
326    }
327}
328
329impl InMemValTransaction for HummockVersionStatsTransaction<'_> {
330    fn commit(self) {
331        if self.stats.has_new_value() {
332            let stats = self.stats.clone();
333            self.stats.commit();
334            self.notification_manager
335                .notify_frontend_without_version(Operation::Update, Info::HummockStats(stats));
336        }
337    }
338}
339
340impl<TXN> ValTransaction<TXN> for HummockVersionStatsTransaction<'_>
341where
342    HummockVersionStats: Transactional<TXN>,
343{
344    async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
345        self.stats.apply_to_txn(txn).await
346    }
347}
348
349impl Deref for HummockVersionStatsTransaction<'_> {
350    type Target = HummockVersionStats;
351
352    fn deref(&self) -> &Self::Target {
353        self.stats.deref()
354    }
355}
356
357impl DerefMut for HummockVersionStatsTransaction<'_> {
358    fn deref_mut(&mut self) -> &mut Self::Target {
359        self.stats.deref_mut()
360    }
361}