risingwave_meta/hummock/manager/
transaction.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::ops::{Deref, DerefMut};
17
18use parking_lot::Mutex;
19use risingwave_common::catalog::TableId;
20use risingwave_hummock_sdk::change_log::{ChangeLogDelta, TableChangeLog};
21use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
22use risingwave_hummock_sdk::sstable_info::SstableInfo;
23use risingwave_hummock_sdk::table_watermark::TableWatermarks;
24use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
25use risingwave_hummock_sdk::version::{GroupDelta, HummockVersion, HummockVersionDelta};
26use risingwave_hummock_sdk::{
27    CompactionGroupId, FrontendHummockVersionDelta, HummockSstableId, HummockVersionId,
28};
29use risingwave_meta_model::Epoch;
30use risingwave_pb::hummock::{
31    CompatibilityVersion, GroupConstruct, HummockVersionDeltas, HummockVersionStats,
32    StateTableInfoDelta,
33};
34use risingwave_pb::meta::subscribe_response::{Info, Operation};
35use sea_orm::{ConnectionTrait, EntityTrait};
36
37use super::TableCommittedEpochNotifiers;
38use crate::hummock::model::CompactionGroup;
39use crate::hummock::model::ext::to_table_change_log_meta_store_model;
40use crate::manager::{MetaOpts, NotificationManager};
41use crate::model::{
42    InMemValTransaction, MetadataModelResult, Transactional, ValTransaction, VarTransaction,
43};
44use crate::rpc::metrics::MetaMetrics;
45
46fn trigger_delta_log_stats(metrics: &MetaMetrics, total_number: usize) {
47    metrics.delta_log_count.set(total_number as _);
48}
49
50fn trigger_version_stat(metrics: &MetaMetrics, current_version: &HummockVersion) {
51    metrics
52        .version_size
53        .set(current_version.estimated_encode_len() as i64);
54    metrics
55        .current_version_id
56        .set(current_version.id.as_i64_id());
57}
58
59pub(super) struct HummockVersionTransaction<'a> {
60    orig_version: &'a mut HummockVersion,
61    orig_deltas: &'a mut BTreeMap<HummockVersionId, HummockVersionDelta>,
62    orig_table_change_log: &'a mut HashMap<TableId, TableChangeLog>,
63    notification_manager: &'a NotificationManager,
64    table_committed_epoch_notifiers: Option<&'a Mutex<TableCommittedEpochNotifiers>>,
65    meta_metrics: &'a MetaMetrics,
66
67    pre_applied_version: Option<(HummockVersion, Vec<HummockVersionDelta>, HashSet<TableId>)>,
68    disable_apply_to_txn: bool,
69    opts: &'a MetaOpts,
70}
71
72impl<'a> HummockVersionTransaction<'a> {
73    pub(super) fn new(
74        version: &'a mut HummockVersion,
75        deltas: &'a mut BTreeMap<HummockVersionId, HummockVersionDelta>,
76        table_change_log: &'a mut HashMap<TableId, TableChangeLog>,
77        notification_manager: &'a NotificationManager,
78        table_committed_epoch_notifiers: Option<&'a Mutex<TableCommittedEpochNotifiers>>,
79        meta_metrics: &'a MetaMetrics,
80        opts: &'a MetaOpts,
81    ) -> Self {
82        Self {
83            orig_version: version,
84            orig_deltas: deltas,
85            orig_table_change_log: table_change_log,
86            pre_applied_version: None,
87            disable_apply_to_txn: false,
88            notification_manager,
89            table_committed_epoch_notifiers,
90            meta_metrics,
91            opts,
92        }
93    }
94
95    pub(super) fn disable_apply_to_txn(&mut self) {
96        assert!(
97            self.pre_applied_version.is_none(),
98            "should only call disable at the beginning of txn"
99        );
100        self.disable_apply_to_txn = true;
101    }
102
103    pub(super) fn latest_version(&self) -> &HummockVersion {
104        if let Some((version, _, _)) = &self.pre_applied_version {
105            version
106        } else {
107            self.orig_version
108        }
109    }
110
111    pub(super) fn new_delta<'b>(&'b mut self) -> SingleDeltaTransaction<'a, 'b> {
112        let delta = self.latest_version().version_delta_after();
113        SingleDeltaTransaction {
114            version_txn: self,
115            delta: Some(delta),
116        }
117    }
118
119    fn pre_apply(&mut self, delta: HummockVersionDelta) {
120        let (version, deltas, gc_change_log_deltas) =
121            self.pre_applied_version.get_or_insert_with(|| {
122                (
123                    self.orig_version.clone(),
124                    Vec::with_capacity(1),
125                    HashSet::new(),
126                )
127            });
128        let changed_table_info = version.apply_version_delta(&delta);
129        // Ideally, the first parameter should be the cumulative state (orig_table_change_log + all applied deltas).
130        // However, currently, we use orig_table_change_log directly because deltas are only applied after a successful metastore write in the end of the transaction.
131        // Consequently, some table eligible for GC are not returned by collect_gc_change_log_delta and are deferred to the next transaction.
132        // This delay is acceptable and does not impact system correctness.
133        let gc_change_log_delta = HummockVersion::collect_gc_change_log_delta(
134            self.orig_table_change_log.keys(),
135            &delta.change_log_delta,
136            &delta.removed_table_ids,
137            &delta.state_table_info_delta,
138            &changed_table_info,
139        );
140        gc_change_log_deltas.extend(gc_change_log_delta);
141        deltas.push(delta);
142    }
143
144    /// Returns a duplicate delta, used by time travel.
145    pub(super) fn pre_commit_epoch(
146        &mut self,
147        tables_to_commit: &HashMap<TableId, u64>,
148        new_compaction_groups: Vec<CompactionGroup>,
149        group_id_to_sub_levels: BTreeMap<CompactionGroupId, Vec<Vec<SstableInfo>>>,
150        new_table_ids: &HashMap<TableId, CompactionGroupId>,
151        new_table_watermarks: HashMap<TableId, TableWatermarks>,
152        change_log_delta: HashMap<TableId, ChangeLogDelta>,
153        vector_index_delta: HashMap<TableId, VectorIndexDelta>,
154        group_id_to_truncate_tables: HashMap<CompactionGroupId, HashSet<TableId>>,
155    ) -> HummockVersionDelta {
156        let mut new_version_delta = self.new_delta();
157        new_version_delta.new_table_watermarks = new_table_watermarks;
158        new_version_delta.change_log_delta = change_log_delta;
159        new_version_delta.vector_index_delta = vector_index_delta;
160
161        for compaction_group in &new_compaction_groups {
162            let group_deltas = &mut new_version_delta
163                .group_deltas
164                .entry(compaction_group.group_id())
165                .or_default()
166                .group_deltas;
167
168            #[expect(deprecated)]
169            group_deltas.push(GroupDelta::GroupConstruct(Box::new(GroupConstruct {
170                group_config: Some(compaction_group.compaction_config().as_ref().clone()),
171                group_id: compaction_group.group_id(),
172                parent_group_id: StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId,
173                new_sst_start_id: HummockSstableId::default(), // No need to set it when `NewCompactionGroup`
174                table_ids: vec![],
175                version: CompatibilityVersion::LATEST as _,
176                split_key: None,
177            })));
178        }
179
180        // Append SSTs to a new version.
181        for (compaction_group_id, sub_levels) in group_id_to_sub_levels {
182            let group_deltas = &mut new_version_delta
183                .group_deltas
184                .entry(compaction_group_id)
185                .or_default()
186                .group_deltas;
187
188            for sub_level in sub_levels {
189                group_deltas.push(GroupDelta::NewL0SubLevel(sub_level));
190            }
191        }
192
193        for (compaction_group_id, table_ids) in group_id_to_truncate_tables {
194            let group_deltas = &mut new_version_delta
195                .group_deltas
196                .entry(compaction_group_id)
197                .or_default()
198                .group_deltas;
199
200            group_deltas.push(GroupDelta::PruneTableIdsFromSsts(
201                table_ids.into_iter().collect(),
202            ));
203        }
204
205        // update state table info
206        new_version_delta.with_latest_version(|version, delta| {
207            for (table_id, cg_id) in new_table_ids {
208                assert!(
209                    !version.state_table_info.info().contains_key(table_id),
210                    "newly added table exists previously: {:?}",
211                    table_id
212                );
213                let committed_epoch = *tables_to_commit.get(table_id).expect("newly added table must exist in tables_to_commit");
214                delta.state_table_info_delta.insert(
215                    *table_id,
216                    StateTableInfoDelta {
217                        committed_epoch,
218                        compaction_group_id: *cg_id,
219                    },
220                );
221            }
222
223            for (table_id, committed_epoch) in tables_to_commit {
224                if new_table_ids.contains_key(table_id) {
225                    continue;
226                }
227                let info = version.state_table_info.info().get(table_id).unwrap_or_else(|| {
228                    panic!("tables_to_commit {:?} contains table_id {} that is not newly added but not exists previously", tables_to_commit, table_id);
229                });
230                assert!(delta
231                    .state_table_info_delta
232                    .insert(
233                        *table_id,
234                        StateTableInfoDelta {
235                            committed_epoch: *committed_epoch,
236                            compaction_group_id: info.compaction_group_id,
237                        }
238                    )
239                    .is_none());
240            }
241        });
242
243        let time_travel_delta = (*new_version_delta).clone();
244        new_version_delta.pre_apply();
245        time_travel_delta
246    }
247}
248
249impl InMemValTransaction for HummockVersionTransaction<'_> {
250    fn commit(self) {
251        if let Some((version, deltas, gc_change_log_deltas)) = self.pre_applied_version {
252            *self.orig_version = version;
253            for delta in &deltas {
254                HummockVersion::apply_change_log_delta(
255                    self.orig_table_change_log,
256                    &delta.change_log_delta,
257                );
258            }
259            self.orig_table_change_log
260                .retain(|table_id, _| !gc_change_log_deltas.contains(table_id));
261
262            if !self.disable_apply_to_txn {
263                let pb_deltas = deltas.iter().map(|delta| delta.to_protobuf()).collect();
264                self.notification_manager.notify_hummock_without_version(
265                    Operation::Add,
266                    Info::HummockVersionDeltas(risingwave_pb::hummock::HummockVersionDeltas {
267                        version_deltas: pb_deltas,
268                    }),
269                );
270                self.notification_manager.notify_frontend_without_version(
271                    Operation::Update,
272                    Info::HummockVersionDeltas(HummockVersionDeltas {
273                        version_deltas: deltas
274                            .iter()
275                            .map(|delta| {
276                                FrontendHummockVersionDelta::from_delta(delta).to_protobuf()
277                            })
278                            .collect(),
279                    }),
280                );
281                if let Some(table_committed_epoch_notifiers) = self.table_committed_epoch_notifiers
282                {
283                    table_committed_epoch_notifiers
284                        .lock()
285                        .notify_deltas(&deltas);
286                }
287            }
288
289            for delta in deltas {
290                assert!(self.orig_deltas.insert(delta.id, delta.clone()).is_none());
291            }
292
293            trigger_delta_log_stats(self.meta_metrics, self.orig_deltas.len());
294            trigger_version_stat(self.meta_metrics, self.orig_version);
295        }
296    }
297}
298
299impl<TXN> ValTransaction<TXN> for HummockVersionTransaction<'_>
300where
301    TXN: ConnectionTrait,
302    HummockVersionDelta: Transactional<TXN>,
303    HummockVersionStats: Transactional<TXN>,
304{
305    async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
306        if self.disable_apply_to_txn {
307            return Ok(());
308        }
309        if let Some((_, deltas, gc_change_log_deltas)) = &self.pre_applied_version {
310            // These upsert_in_transaction can be batched. However, we know len(deltas) is always 1 currently.
311            for delta in deltas {
312                delta.upsert_in_transaction(txn).await?;
313            }
314
315            let insert_batch_size = self.opts.table_change_log_insert_batch_size as usize;
316            use futures::stream::{self, StreamExt};
317            use sea_orm::{ColumnTrait, Condition, QueryFilter};
318            let insert_iter = deltas
319                .iter()
320                .flat_map(|i| i.change_log_delta.iter())
321                .map(|(table_id, change_log_delta)| (*table_id, &change_log_delta.new_log));
322            let mut stream = stream::iter(insert_iter).chunks(insert_batch_size);
323            while let Some(change_log_batch) = stream.next().await {
324                let insert_many = change_log_batch
325                    .into_iter()
326                    .map(|(table_id, change_log)| {
327                        to_table_change_log_meta_store_model(table_id, change_log)
328                    })
329                    .collect::<Vec<_>>();
330                risingwave_meta_model::hummock_table_change_log::Entity::insert_many(insert_many)
331                    .on_empty_do_nothing()
332                    .exec(txn)
333                    .await?;
334            }
335
336            let delete_batch_size = self.opts.table_change_log_delete_batch_size as usize;
337            let delete_iter = deltas
338                .iter()
339                .flat_map(|i| i.change_log_delta.iter())
340                .map(|(table_id, change_log_delta)| (*table_id, change_log_delta.truncate_epoch))
341                .chain(
342                    gc_change_log_deltas
343                        .iter()
344                        .map(|table_id| (*table_id, u64::MAX)),
345                );
346
347            let mut stream = stream::iter(delete_iter).chunks(delete_batch_size);
348            while let Some(change_log_batch) = stream.next().await {
349                let mut condition = Condition::any();
350                for (table_id, truncate_epoch) in change_log_batch {
351                    condition = condition.add(
352                        Condition::all()
353                            .add(risingwave_meta_model::hummock_table_change_log::Column::TableId.eq(table_id))
354                            .add(risingwave_meta_model::hummock_table_change_log::Column::CheckpointEpoch.lt(truncate_epoch as Epoch))
355                    );
356                }
357                risingwave_meta_model::hummock_table_change_log::Entity::delete_many()
358                    .filter(condition)
359                    .exec(txn)
360                    .await?;
361            }
362        }
363        Ok(())
364    }
365}
366
367pub(super) struct SingleDeltaTransaction<'a, 'b> {
368    version_txn: &'b mut HummockVersionTransaction<'a>,
369    delta: Option<HummockVersionDelta>,
370}
371
372impl SingleDeltaTransaction<'_, '_> {
373    pub(super) fn latest_version(&self) -> &HummockVersion {
374        self.version_txn.latest_version()
375    }
376
377    pub(super) fn pre_apply(mut self) {
378        self.version_txn.pre_apply(self.delta.take().unwrap());
379    }
380
381    pub(super) fn with_latest_version(
382        &mut self,
383        f: impl FnOnce(&HummockVersion, &mut HummockVersionDelta),
384    ) {
385        f(
386            self.version_txn.latest_version(),
387            self.delta.as_mut().expect("should exist"),
388        )
389    }
390}
391
392impl Deref for SingleDeltaTransaction<'_, '_> {
393    type Target = HummockVersionDelta;
394
395    fn deref(&self) -> &Self::Target {
396        self.delta.as_ref().expect("should exist")
397    }
398}
399
400impl DerefMut for SingleDeltaTransaction<'_, '_> {
401    fn deref_mut(&mut self) -> &mut Self::Target {
402        self.delta.as_mut().expect("should exist")
403    }
404}
405
406impl Drop for SingleDeltaTransaction<'_, '_> {
407    fn drop(&mut self) {
408        if let Some(delta) = self.delta.take() {
409            self.version_txn.pre_apply(delta);
410        }
411    }
412}
413
414pub(super) struct HummockVersionStatsTransaction<'a> {
415    stats: VarTransaction<'a, HummockVersionStats>,
416    notification_manager: &'a NotificationManager,
417}
418
419impl<'a> HummockVersionStatsTransaction<'a> {
420    pub(super) fn new(
421        stats: &'a mut HummockVersionStats,
422        notification_manager: &'a NotificationManager,
423    ) -> Self {
424        Self {
425            stats: VarTransaction::new(stats),
426            notification_manager,
427        }
428    }
429}
430
431impl InMemValTransaction for HummockVersionStatsTransaction<'_> {
432    fn commit(self) {
433        if self.stats.has_new_value() {
434            let stats = self.stats.clone();
435            self.stats.commit();
436            self.notification_manager
437                .notify_frontend_without_version(Operation::Update, Info::HummockStats(stats));
438        }
439    }
440}
441
442impl<TXN> ValTransaction<TXN> for HummockVersionStatsTransaction<'_>
443where
444    TXN: ConnectionTrait,
445    HummockVersionStats: Transactional<TXN>,
446{
447    async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> {
448        self.stats.apply_to_txn(txn).await
449    }
450}
451
452impl Deref for HummockVersionStatsTransaction<'_> {
453    type Target = HummockVersionStats;
454
455    fn deref(&self) -> &Self::Target {
456        self.stats.deref()
457    }
458}
459
460impl DerefMut for HummockVersionStatsTransaction<'_> {
461    fn deref_mut(&mut self) -> &mut Self::Target {
462        self.stats.deref_mut()
463    }
464}