risingwave_meta/hummock/manager/
transaction.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::ops::{Deref, DerefMut};
17
18use parking_lot::Mutex;
19use risingwave_common::catalog::TableId;
20use risingwave_hummock_sdk::change_log::{ChangeLogDelta, TableChangeLog};
21use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
22use risingwave_hummock_sdk::sstable_info::SstableInfo;
23use risingwave_hummock_sdk::table_watermark::TableWatermarks;
24use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
25use risingwave_hummock_sdk::version::{GroupDelta, HummockVersion, HummockVersionDelta};
26use risingwave_hummock_sdk::{
27    CompactionGroupId, FrontendHummockVersionDelta, HummockSstableId, HummockVersionId,
28};
29use risingwave_meta_model::Epoch;
30use risingwave_pb::hummock::{
31    CompatibilityVersion, GroupConstruct, HummockVersionDeltas, HummockVersionStats,
32    StateTableInfoDelta,
33};
34use risingwave_pb::meta::subscribe_response::{Info, Operation};
35use sea_orm::{ConnectionTrait, EntityTrait};
36
37use super::TableCommittedEpochNotifiers;
38use crate::hummock::model::CompactionGroup;
39use crate::hummock::model::ext::to_table_change_log_meta_store_model;
40use crate::manager::{MetaOpts, NotificationManager};
41use crate::model::{
42    InMemValTransaction, MetadataModelResult, Transactional, ValTransaction, VarTransaction,
43};
44use crate::rpc::metrics::MetaMetrics;
45
46fn trigger_delta_log_stats(metrics: &MetaMetrics, total_number: usize) {
47    metrics.delta_log_count.set(total_number as _);
48}
49
50fn trigger_version_stat(metrics: &MetaMetrics, current_version: &HummockVersion) {
51    metrics
52        .version_size
53        .set(current_version.estimated_encode_len() as i64);
54    metrics
55        .current_version_id
56        .set(current_version.id.as_i64_id());
57}
58
59pub(super) struct HummockVersionTransaction<'a> {
60    orig_version: &'a mut HummockVersion,
61    orig_deltas: &'a mut BTreeMap<HummockVersionId, HummockVersionDelta>,
62    orig_table_change_log: &'a mut HashMap<TableId, TableChangeLog>,
63    notification_manager: &'a NotificationManager,
64    table_committed_epoch_notifiers: Option<&'a Mutex<TableCommittedEpochNotifiers>>,
65    meta_metrics: &'a MetaMetrics,
66
67    pre_applied_version: Option<(HummockVersion, Vec<HummockVersionDelta>, HashSet<TableId>)>,
68    disable_apply_to_txn: bool,
69    opts: &'a MetaOpts,
70}
71
72impl<'a> HummockVersionTransaction<'a> {
73    pub(super) fn new(
74        version: &'a mut HummockVersion,
75        deltas: &'a mut BTreeMap<HummockVersionId, HummockVersionDelta>,
76        table_change_log: &'a mut HashMap<TableId, TableChangeLog>,
77        notification_manager: &'a NotificationManager,
78        table_committed_epoch_notifiers: Option<&'a Mutex<TableCommittedEpochNotifiers>>,
79        meta_metrics: &'a MetaMetrics,
80        opts: &'a MetaOpts,
81    ) -> Self {
82        Self {
83            orig_version: version,
84            orig_deltas: deltas,
85            orig_table_change_log: table_change_log,
86            pre_applied_version: None,
87            disable_apply_to_txn: false,
88            notification_manager,
89            table_committed_epoch_notifiers,
90            meta_metrics,
91            opts,
92        }
93    }
94
95    pub(super) fn disable_apply_to_txn(&mut self) {
96        assert!(
97            self.pre_applied_version.is_none(),
98            "should only call disable at the beginning of txn"
99        );
100        self.disable_apply_to_txn = true;
101    }
102
103    pub(super) fn latest_version(&self) -> &HummockVersion {
104        if let Some((version, _, _)) = &self.pre_applied_version {
105            version
106        } else {
107            self.orig_version
108        }
109    }
110
111    pub(super) fn new_delta<'b>(&'b mut self) -> SingleDeltaTransaction<'a, 'b> {
112        let delta = self.latest_version().version_delta_after();
113        SingleDeltaTransaction {
114            version_txn: self,
115            delta: Some(delta),
116        }
117    }
118
119    fn pre_apply(&mut self, delta: HummockVersionDelta) {
120        let (version, deltas, gc_change_log_deltas) =
121            self.pre_applied_version.get_or_insert_with(|| {
122                (
123                    self.orig_version.clone(),
124                    Vec::with_capacity(1),
125                    HashSet::new(),
126                )
127            });
128        let changed_table_info = version.apply_version_delta(&delta);
129        // Ideally, the first parameter should be the cumulative state (orig_table_change_log + all applied deltas).
130        // However, currently, we use orig_table_change_log directly because deltas are only applied after a successful metastore write in the end of the transaction.
131        // Consequently, some table eligible for GC are not returned by collect_gc_change_log_delta and are deferred to the next transaction.
132        // This delay is acceptable and does not impact system correctness.
133        let gc_change_log_delta = HummockVersion::collect_gc_change_log_delta(
134            self.orig_table_change_log.keys(),
135            &delta.change_log_delta,
136            &delta.removed_table_ids,
137            &delta.state_table_info_delta,
138            &changed_table_info,
139        );
140        gc_change_log_deltas.extend(gc_change_log_delta);
141        deltas.push(delta);
142    }
143
144    /// Returns a duplicate delta, used by time travel.
145    pub(super) fn pre_commit_epoch(
146        &mut self,
147        tables_to_commit: &HashMap<TableId, u64>,
148        new_compaction_groups: Vec<CompactionGroup>,
149        group_id_to_sub_levels: BTreeMap<CompactionGroupId, Vec<Vec<SstableInfo>>>,
150        new_table_ids: &HashMap<TableId, CompactionGroupId>,
151        new_table_watermarks: HashMap<TableId, TableWatermarks>,
152        change_log_delta: HashMap<TableId, ChangeLogDelta>,
153        vector_index_delta: HashMap<TableId, VectorIndexDelta>,
154        group_id_to_truncate_tables: HashMap<CompactionGroupId, HashSet<TableId>>,
155    ) -> HummockVersionDelta {
156        let mut new_version_delta = self.new_delta();
157        new_version_delta.new_table_watermarks = new_table_watermarks;
158        new_version_delta.change_log_delta = change_log_delta;
159        new_version_delta.vector_index_delta = vector_index_delta;
160
161        for compaction_group in &new_compaction_groups {
162            let group_deltas = &mut new_version_delta
163                .group_deltas
164                .entry(compaction_group.group_id())
165                .or_default()
166                .group_deltas;
167
168            #[expect(deprecated)]
169            group_deltas.push(GroupDelta::GroupConstruct(Box::new(GroupConstruct {
170                group_config: Some(compaction_group.compaction_config().as_ref().clone()),
171                group_id: compaction_group.group_id(),
172                parent_group_id: StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId,
173                new_sst_start_id: HummockSstableId::default(), // No need to set it when `NewCompactionGroup`
174                table_ids: vec![],
175                version: CompatibilityVersion::LATEST as _,
176                split_key: None,
177            })));
178        }
179
180        // Append SSTs to a new version.
181        for (compaction_group_id, sub_levels) in group_id_to_sub_levels {
182            let group_deltas = &mut new_version_delta
183                .group_deltas
184                .entry(compaction_group_id)
185                .or_default()
186                .group_deltas;
187
188            for sub_level in sub_levels {
189                group_deltas.push(GroupDelta::NewL0SubLevel(sub_level));
190            }
191        }
192
193        for (compaction_group_id, table_ids) in group_id_to_truncate_tables {
194            let group_deltas = &mut new_version_delta
195                .group_deltas
196                .entry(compaction_group_id)
197                .or_default()
198                .group_deltas;
199
200            group_deltas.push(GroupDelta::TruncateTables(table_ids.into_iter().collect()));
201        }
202
203        // update state table info
204        new_version_delta.with_latest_version(|version, delta| {
205            for (table_id, cg_id) in new_table_ids {
206                assert!(
207                    !version.state_table_info.info().contains_key(table_id),
208                    "newly added table exists previously: {:?}",
209                    table_id
210                );
211                let committed_epoch = *tables_to_commit.get(table_id).expect("newly added table must exist in tables_to_commit");
212                delta.state_table_info_delta.insert(
213                    *table_id,
214                    StateTableInfoDelta {
215                        committed_epoch,
216                        compaction_group_id: *cg_id,
217                    },
218                );
219            }
220
221            for (table_id, committed_epoch) in tables_to_commit {
222                if new_table_ids.contains_key(table_id) {
223                    continue;
224                }
225                let info = version.state_table_info.info().get(table_id).unwrap_or_else(|| {
226                    panic!("tables_to_commit {:?} contains table_id {} that is not newly added but not exists previously", tables_to_commit, table_id);
227                });
228                assert!(delta
229                    .state_table_info_delta
230                    .insert(
231                        *table_id,
232                        StateTableInfoDelta {
233                            committed_epoch: *committed_epoch,
234                            compaction_group_id: info.compaction_group_id,
235                        }
236                    )
237                    .is_none());
238            }
239        });
240
241        let time_travel_delta = (*new_version_delta).clone();
242        new_version_delta.pre_apply();
243        time_travel_delta
244    }
245}
246
247impl InMemValTransaction for HummockVersionTransaction<'_> {
248    fn commit(self) {
249        if let Some((version, deltas, gc_change_log_deltas)) = self.pre_applied_version {
250            *self.orig_version = version;
251            for delta in &deltas {
252                HummockVersion::apply_change_log_delta(
253                    self.orig_table_change_log,
254                    &delta.change_log_delta,
255                );
256            }
257            self.orig_table_change_log
258                .retain(|table_id, _| !gc_change_log_deltas.contains(table_id));
259
260            if !self.disable_apply_to_txn {
261                let pb_deltas = deltas.iter().map(|delta| delta.to_protobuf()).collect();
262                self.notification_manager.notify_hummock_without_version(
263                    Operation::Add,
264                    Info::HummockVersionDeltas(risingwave_pb::hummock::HummockVersionDeltas {
265                        version_deltas: pb_deltas,
266                    }),
267                );
268                self.notification_manager.notify_frontend_without_version(
269                    Operation::Update,
270                    Info::HummockVersionDeltas(HummockVersionDeltas {
271                        version_deltas: deltas
272                            .iter()
273                            .map(|delta| {
274                                FrontendHummockVersionDelta::from_delta(delta).to_protobuf()
275                            })
276                            .collect(),
277                    }),
278                );
279                if let Some(table_committed_epoch_notifiers) = self.table_committed_epoch_notifiers
280                {
281                    table_committed_epoch_notifiers
282                        .lock()
283                        .notify_deltas(&deltas);
284                }
285            }
286
287            for delta in deltas {
288                assert!(self.orig_deltas.insert(delta.id, delta.clone()).is_none());
289            }
290
291            trigger_delta_log_stats(self.meta_metrics, self.orig_deltas.len());
292            trigger_version_stat(self.meta_metrics, self.orig_version);
293        }
294    }
295}
296
297impl<TXN> ValTransaction<TXN> for HummockVersionTransaction<'_>
298where
299    TXN: ConnectionTrait,
300    HummockVersionDelta: Transactional<TXN>,
301    HummockVersionStats: Transactional<TXN>,
302{
303    async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
304        if self.disable_apply_to_txn {
305            return Ok(());
306        }
307        if let Some((_, deltas, gc_change_log_deltas)) = &self.pre_applied_version {
308            // These upsert_in_transaction can be batched. However, we know len(deltas) is always 1 currently.
309            for delta in deltas {
310                delta.upsert_in_transaction(txn).await?;
311            }
312
313            let insert_batch_size = self.opts.table_change_log_insert_batch_size as usize;
314            use futures::stream::{self, StreamExt};
315            use sea_orm::{ColumnTrait, Condition, QueryFilter};
316            let insert_iter = deltas
317                .iter()
318                .flat_map(|i| i.change_log_delta.iter())
319                .map(|(table_id, change_log_delta)| (*table_id, &change_log_delta.new_log));
320            let mut stream = stream::iter(insert_iter).chunks(insert_batch_size);
321            while let Some(change_log_batch) = stream.next().await {
322                let insert_many = change_log_batch
323                    .into_iter()
324                    .map(|(table_id, change_log)| {
325                        to_table_change_log_meta_store_model(table_id, change_log)
326                    })
327                    .collect::<Vec<_>>();
328                risingwave_meta_model::hummock_table_change_log::Entity::insert_many(insert_many)
329                    .on_empty_do_nothing()
330                    .exec(txn)
331                    .await?;
332            }
333
334            let delete_batch_size = self.opts.table_change_log_delete_batch_size as usize;
335            let delete_iter = deltas
336                .iter()
337                .flat_map(|i| i.change_log_delta.iter())
338                .map(|(table_id, change_log_delta)| (*table_id, change_log_delta.truncate_epoch))
339                .chain(
340                    gc_change_log_deltas
341                        .iter()
342                        .map(|table_id| (*table_id, u64::MAX)),
343                );
344
345            let mut stream = stream::iter(delete_iter).chunks(delete_batch_size);
346            while let Some(change_log_batch) = stream.next().await {
347                let mut condition = Condition::any();
348                for (table_id, truncate_epoch) in change_log_batch {
349                    condition = condition.add(
350                        Condition::all()
351                            .add(risingwave_meta_model::hummock_table_change_log::Column::TableId.eq(table_id))
352                            .add(risingwave_meta_model::hummock_table_change_log::Column::CheckpointEpoch.lt(truncate_epoch as Epoch))
353                    );
354                }
355                risingwave_meta_model::hummock_table_change_log::Entity::delete_many()
356                    .filter(condition)
357                    .exec(txn)
358                    .await?;
359            }
360        }
361        Ok(())
362    }
363}
364
365pub(super) struct SingleDeltaTransaction<'a, 'b> {
366    version_txn: &'b mut HummockVersionTransaction<'a>,
367    delta: Option<HummockVersionDelta>,
368}
369
370impl SingleDeltaTransaction<'_, '_> {
371    pub(super) fn latest_version(&self) -> &HummockVersion {
372        self.version_txn.latest_version()
373    }
374
375    pub(super) fn pre_apply(mut self) {
376        self.version_txn.pre_apply(self.delta.take().unwrap());
377    }
378
379    pub(super) fn with_latest_version(
380        &mut self,
381        f: impl FnOnce(&HummockVersion, &mut HummockVersionDelta),
382    ) {
383        f(
384            self.version_txn.latest_version(),
385            self.delta.as_mut().expect("should exist"),
386        )
387    }
388}
389
390impl Deref for SingleDeltaTransaction<'_, '_> {
391    type Target = HummockVersionDelta;
392
393    fn deref(&self) -> &Self::Target {
394        self.delta.as_ref().expect("should exist")
395    }
396}
397
398impl DerefMut for SingleDeltaTransaction<'_, '_> {
399    fn deref_mut(&mut self) -> &mut Self::Target {
400        self.delta.as_mut().expect("should exist")
401    }
402}
403
404impl Drop for SingleDeltaTransaction<'_, '_> {
405    fn drop(&mut self) {
406        if let Some(delta) = self.delta.take() {
407            self.version_txn.pre_apply(delta);
408        }
409    }
410}
411
412pub(super) struct HummockVersionStatsTransaction<'a> {
413    stats: VarTransaction<'a, HummockVersionStats>,
414    notification_manager: &'a NotificationManager,
415}
416
417impl<'a> HummockVersionStatsTransaction<'a> {
418    pub(super) fn new(
419        stats: &'a mut HummockVersionStats,
420        notification_manager: &'a NotificationManager,
421    ) -> Self {
422        Self {
423            stats: VarTransaction::new(stats),
424            notification_manager,
425        }
426    }
427}
428
429impl InMemValTransaction for HummockVersionStatsTransaction<'_> {
430    fn commit(self) {
431        if self.stats.has_new_value() {
432            let stats = self.stats.clone();
433            self.stats.commit();
434            self.notification_manager
435                .notify_frontend_without_version(Operation::Update, Info::HummockStats(stats));
436        }
437    }
438}
439
440impl<TXN> ValTransaction<TXN> for HummockVersionStatsTransaction<'_>
441where
442    TXN: ConnectionTrait,
443    HummockVersionStats: Transactional<TXN>,
444{
445    async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
446        self.stats.apply_to_txn(txn).await
447    }
448}
449
450impl Deref for HummockVersionStatsTransaction<'_> {
451    type Target = HummockVersionStats;
452
453    fn deref(&self) -> &Self::Target {
454        self.stats.deref()
455    }
456}
457
458impl DerefMut for HummockVersionStatsTransaction<'_> {
459    fn deref_mut(&mut self) -> &mut Self::Target {
460        self.stats.deref_mut()
461    }
462}