risingwave_meta/hummock/manager/
time_travel.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet, VecDeque};
16
17use anyhow::anyhow;
18use risingwave_common::catalog::TableId;
19use risingwave_common::system_param::reader::SystemParamsRead;
20use risingwave_common::util::epoch::Epoch;
21use risingwave_hummock_sdk::compaction_group::StateTableId;
22use risingwave_hummock_sdk::sstable_info::SstableInfo;
23use risingwave_hummock_sdk::time_travel::{
24    IncompleteHummockVersion, IncompleteHummockVersionDelta, refill_version,
25};
26use risingwave_hummock_sdk::version::{GroupDeltaCommon, HummockVersion, HummockVersionDelta};
27use risingwave_hummock_sdk::{
28    CompactionGroupId, HummockEpoch, HummockSstableId, HummockSstableObjectId,
29};
30use risingwave_meta_model::hummock_sstable_info::SstableInfoV2Backend;
31use risingwave_meta_model::{
32    hummock_epoch_to_version, hummock_sstable_info, hummock_time_travel_delta,
33    hummock_time_travel_version,
34};
35use risingwave_pb::hummock::{PbHummockVersion, PbHummockVersionDelta};
36use sea_orm::ActiveValue::Set;
37use sea_orm::{
38    ColumnTrait, Condition, DatabaseTransaction, EntityTrait, PaginatorTrait, QueryFilter,
39    QueryOrder, QuerySelect, TransactionTrait,
40};
41
42use crate::hummock::HummockManager;
43use crate::hummock::error::{Error, Result};
44
45/// Time travel.
46impl HummockManager {
47    pub(crate) async fn init_time_travel_state(&self) -> Result<()> {
48        let sql_store = self.env.meta_store_ref();
49        let mut guard = self.versioning.write().await;
50        guard.mark_next_time_travel_version_snapshot();
51
52        guard.last_time_travel_snapshot_sst_ids = HashSet::new();
53        let Some(version) = hummock_time_travel_version::Entity::find()
54            .order_by_desc(hummock_time_travel_version::Column::VersionId)
55            .one(&sql_store.conn)
56            .await?
57            .map(|v| IncompleteHummockVersion::from_persisted_protobuf(&v.version.to_protobuf()))
58        else {
59            return Ok(());
60        };
61        guard.last_time_travel_snapshot_sst_ids = version.get_sst_ids(true);
62        Ok(())
63    }
64
65    pub(crate) async fn truncate_time_travel_metadata(
66        &self,
67        epoch_watermark: HummockEpoch,
68    ) -> Result<()> {
69        let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id();
70        let sql_store = self.env.meta_store_ref();
71        let txn = sql_store.conn.begin().await?;
72        let version_watermark = hummock_epoch_to_version::Entity::find()
73            .filter(
74                hummock_epoch_to_version::Column::Epoch
75                    .lt(risingwave_meta_model::Epoch::try_from(epoch_watermark).unwrap()),
76            )
77            .order_by_desc(hummock_epoch_to_version::Column::Epoch)
78            .order_by_asc(hummock_epoch_to_version::Column::VersionId)
79            .one(&txn)
80            .await?;
81        let Some(version_watermark) = version_watermark else {
82            txn.commit().await?;
83            return Ok(());
84        };
85        let watermark_version_id = std::cmp::min(
86            version_watermark.version_id,
87            min_pinned_version_id.to_u64().try_into().unwrap(),
88        );
89        let res = hummock_epoch_to_version::Entity::delete_many()
90            .filter(
91                hummock_epoch_to_version::Column::Epoch
92                    .lt(risingwave_meta_model::Epoch::try_from(epoch_watermark).unwrap()),
93            )
94            .exec(&txn)
95            .await?;
96        tracing::debug!(
97            epoch_watermark,
98            "delete {} rows from hummock_epoch_to_version",
99            res.rows_affected
100        );
101        let latest_valid_version = hummock_time_travel_version::Entity::find()
102            .filter(hummock_time_travel_version::Column::VersionId.lte(watermark_version_id))
103            .order_by_desc(hummock_time_travel_version::Column::VersionId)
104            .one(&txn)
105            .await?
106            .map(|m| IncompleteHummockVersion::from_persisted_protobuf(&m.version.to_protobuf()));
107        let Some(latest_valid_version) = latest_valid_version else {
108            txn.commit().await?;
109            return Ok(());
110        };
111        let (
112            latest_valid_version_id,
113            latest_valid_version_sst_ids,
114            latest_valid_version_object_ids,
115        ) = {
116            (
117                latest_valid_version.id,
118                latest_valid_version.get_sst_ids(true),
119                latest_valid_version.get_object_ids(true),
120            )
121        };
122        let mut object_ids_to_delete: HashSet<_> = HashSet::default();
123        let version_ids_to_delete: Vec<risingwave_meta_model::HummockVersionId> =
124            hummock_time_travel_version::Entity::find()
125                .select_only()
126                .column(hummock_time_travel_version::Column::VersionId)
127                .filter(
128                    hummock_time_travel_version::Column::VersionId
129                        .lt(latest_valid_version_id.to_u64()),
130                )
131                .order_by_desc(hummock_time_travel_version::Column::VersionId)
132                .into_tuple()
133                .all(&txn)
134                .await?;
135        let delta_ids_to_delete: Vec<risingwave_meta_model::HummockVersionId> =
136            hummock_time_travel_delta::Entity::find()
137                .select_only()
138                .column(hummock_time_travel_delta::Column::VersionId)
139                .filter(
140                    hummock_time_travel_delta::Column::VersionId
141                        .lt(latest_valid_version_id.to_u64()),
142                )
143                .into_tuple()
144                .all(&txn)
145                .await?;
146        // Reuse hummock_time_travel_epoch_version_insert_batch_size as threshold.
147        let delete_sst_batch_size = self
148            .env
149            .opts
150            .hummock_time_travel_epoch_version_insert_batch_size;
151        let mut sst_ids_to_delete: HashSet<_> = HashSet::default();
152        async fn delete_sst_in_batch(
153            txn: &DatabaseTransaction,
154            sst_ids_to_delete: HashSet<HummockSstableId>,
155            delete_sst_batch_size: usize,
156        ) -> Result<()> {
157            for start_idx in 0..=(sst_ids_to_delete.len().saturating_sub(1) / delete_sst_batch_size)
158            {
159                hummock_sstable_info::Entity::delete_many()
160                    .filter(
161                        hummock_sstable_info::Column::SstId.is_in(
162                            sst_ids_to_delete
163                                .iter()
164                                .skip(start_idx * delete_sst_batch_size)
165                                .take(delete_sst_batch_size)
166                                .copied(),
167                        ),
168                    )
169                    .exec(txn)
170                    .await?;
171            }
172            Ok(())
173        }
174        for delta_id_to_delete in delta_ids_to_delete {
175            let delta_to_delete = hummock_time_travel_delta::Entity::find_by_id(delta_id_to_delete)
176                .one(&txn)
177                .await?
178                .ok_or_else(|| {
179                    Error::TimeTravel(anyhow!(format!(
180                        "version delta {} not found",
181                        delta_id_to_delete
182                    )))
183                })?;
184            let delta_to_delete = IncompleteHummockVersionDelta::from_persisted_protobuf(
185                &delta_to_delete.version_delta.to_protobuf(),
186            );
187            let new_sst_ids = delta_to_delete.newly_added_sst_ids(true);
188            // The SST ids added and then deleted by compaction between the 2 versions.
189            sst_ids_to_delete.extend(&new_sst_ids - &latest_valid_version_sst_ids);
190            if sst_ids_to_delete.len() >= delete_sst_batch_size {
191                delete_sst_in_batch(
192                    &txn,
193                    std::mem::take(&mut sst_ids_to_delete),
194                    delete_sst_batch_size,
195                )
196                .await?;
197            }
198            let new_object_ids = delta_to_delete.newly_added_object_ids(true);
199            object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
200        }
201        let mut next_version_sst_ids = latest_valid_version_sst_ids;
202        for prev_version_id in version_ids_to_delete {
203            let prev_version = {
204                let prev_version = hummock_time_travel_version::Entity::find_by_id(prev_version_id)
205                    .one(&txn)
206                    .await?
207                    .ok_or_else(|| {
208                        Error::TimeTravel(anyhow!(format!(
209                            "prev_version {} not found",
210                            prev_version_id
211                        )))
212                    })?;
213                IncompleteHummockVersion::from_persisted_protobuf(
214                    &prev_version.version.to_protobuf(),
215                )
216            };
217            let sst_ids = prev_version.get_sst_ids(true);
218            // The SST ids deleted by compaction between the 2 versions.
219            sst_ids_to_delete.extend(&sst_ids - &next_version_sst_ids);
220            if sst_ids_to_delete.len() >= delete_sst_batch_size {
221                delete_sst_in_batch(
222                    &txn,
223                    std::mem::take(&mut sst_ids_to_delete),
224                    delete_sst_batch_size,
225                )
226                .await?;
227            }
228            let new_object_ids = prev_version.get_object_ids(true);
229            object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
230            next_version_sst_ids = sst_ids;
231        }
232        if !sst_ids_to_delete.is_empty() {
233            delete_sst_in_batch(&txn, sst_ids_to_delete, delete_sst_batch_size).await?;
234        }
235
236        if !object_ids_to_delete.is_empty() {
237            // IMPORTANT: object_ids_to_delete may include objects that are still being used by SSTs not included in time travel metadata.
238            // So it's crucial to filter out those objects before actually deleting them, i.e. when using `try_take_may_delete_object_ids`.
239            self.gc_manager
240                .add_may_delete_object_ids(object_ids_to_delete.into_iter());
241        }
242
243        let res = hummock_time_travel_version::Entity::delete_many()
244            .filter(
245                hummock_time_travel_version::Column::VersionId.lt(latest_valid_version_id.to_u64()),
246            )
247            .exec(&txn)
248            .await?;
249        tracing::debug!(
250            epoch_watermark_version_id = ?watermark_version_id,
251            ?latest_valid_version_id,
252            "delete {} rows from hummock_time_travel_version",
253            res.rows_affected
254        );
255
256        let res = hummock_time_travel_delta::Entity::delete_many()
257            .filter(
258                hummock_time_travel_delta::Column::VersionId.lt(latest_valid_version_id.to_u64()),
259            )
260            .exec(&txn)
261            .await?;
262        tracing::debug!(
263            epoch_watermark_version_id = ?watermark_version_id,
264            ?latest_valid_version_id,
265            "delete {} rows from hummock_time_travel_delta",
266            res.rows_affected
267        );
268
269        txn.commit().await?;
270        Ok(())
271    }
272
273    pub(crate) async fn filter_out_objects_by_time_travel(
274        &self,
275        objects: impl Iterator<Item = HummockSstableObjectId>,
276        batch_size: usize,
277    ) -> Result<HashSet<HummockSstableObjectId>> {
278        // The input object count is much smaller than time travel pinned object count in meta store.
279        // So search input object in meta store.
280        let mut result: HashSet<_> = objects.collect();
281        let mut remain: VecDeque<_> = result.iter().copied().collect();
282        while !remain.is_empty() {
283            let batch = remain.drain(..std::cmp::min(remain.len(), batch_size));
284            let reject_object_ids: Vec<risingwave_meta_model::HummockSstableObjectId> =
285                hummock_sstable_info::Entity::find()
286                    .filter(hummock_sstable_info::Column::ObjectId.is_in(batch))
287                    .select_only()
288                    .column(hummock_sstable_info::Column::ObjectId)
289                    .into_tuple()
290                    .all(&self.env.meta_store_ref().conn)
291                    .await?;
292            for reject in reject_object_ids {
293                let object_id = HummockSstableObjectId::try_from(reject).unwrap();
294                result.remove(&object_id);
295            }
296        }
297        Ok(result)
298    }
299
300    pub(crate) async fn time_travel_pinned_object_count(&self) -> Result<u64> {
301        let count = hummock_sstable_info::Entity::find()
302            .count(&self.env.meta_store_ref().conn)
303            .await?;
304        Ok(count)
305    }
306
307    /// Attempt to locate the version corresponding to `query_epoch`.
308    ///
309    /// The version is retrieved from `hummock_epoch_to_version`, selecting the entry with the largest epoch that's lte `query_epoch`.
310    ///
311    /// The resulted version is complete, i.e. with correct `SstableInfo`.
312    pub async fn epoch_to_version(
313        &self,
314        query_epoch: HummockEpoch,
315        table_id: u32,
316    ) -> Result<HummockVersion> {
317        let sql_store = self.env.meta_store_ref();
318        let _permit = self.inflight_time_travel_query.try_acquire().map_err(|_| {
319            anyhow!(format!(
320                "too many inflight time travel queries, max_inflight_time_travel_query={}",
321                self.env.opts.max_inflight_time_travel_query
322            ))
323        })?;
324        let epoch_to_version = hummock_epoch_to_version::Entity::find()
325            .filter(
326                Condition::any()
327                    .add(hummock_epoch_to_version::Column::TableId.eq(i64::from(table_id)))
328                    // for backward compatibility
329                    .add(hummock_epoch_to_version::Column::TableId.eq(0)),
330            )
331            .filter(
332                hummock_epoch_to_version::Column::Epoch
333                    .lte(risingwave_meta_model::Epoch::try_from(query_epoch).unwrap()),
334            )
335            .order_by_desc(hummock_epoch_to_version::Column::Epoch)
336            .one(&sql_store.conn)
337            .await?
338            .ok_or_else(|| {
339                Error::TimeTravel(anyhow!(format!(
340                    "version not found for epoch {}",
341                    query_epoch
342                )))
343            })?;
344        let timer = self
345            .metrics
346            .time_travel_version_replay_latency
347            .start_timer();
348        let actual_version_id = epoch_to_version.version_id;
349        tracing::debug!(
350            query_epoch,
351            query_tz = ?(Epoch(query_epoch).as_timestamptz()),
352            actual_epoch = epoch_to_version.epoch,
353            actual_tz = ?(Epoch(u64::try_from(epoch_to_version.epoch).unwrap()).as_timestamptz()),
354            actual_version_id,
355            "convert query epoch"
356        );
357
358        let replay_version = hummock_time_travel_version::Entity::find()
359            .filter(hummock_time_travel_version::Column::VersionId.lte(actual_version_id))
360            .order_by_desc(hummock_time_travel_version::Column::VersionId)
361            .one(&sql_store.conn)
362            .await?
363            .ok_or_else(|| {
364                Error::TimeTravel(anyhow!(format!(
365                    "no replay version found for epoch {}, version {}",
366                    query_epoch, actual_version_id,
367                )))
368            })?;
369        let deltas = hummock_time_travel_delta::Entity::find()
370            .filter(hummock_time_travel_delta::Column::VersionId.gt(replay_version.version_id))
371            .filter(hummock_time_travel_delta::Column::VersionId.lte(actual_version_id))
372            .order_by_asc(hummock_time_travel_delta::Column::VersionId)
373            .all(&sql_store.conn)
374            .await?;
375        // SstableInfo in actual_version is incomplete before refill_version.
376        let mut actual_version = replay_archive(
377            replay_version.version.to_protobuf(),
378            deltas.into_iter().map(|d| d.version_delta.to_protobuf()),
379        );
380
381        let mut sst_ids = actual_version
382            .get_sst_ids(true)
383            .into_iter()
384            .collect::<VecDeque<_>>();
385        let sst_count = sst_ids.len();
386        let mut sst_id_to_info = HashMap::with_capacity(sst_count);
387        let sst_info_fetch_batch_size = self.env.opts.hummock_time_travel_sst_info_fetch_batch_size;
388        while !sst_ids.is_empty() {
389            let sst_infos = hummock_sstable_info::Entity::find()
390                .filter(hummock_sstable_info::Column::SstId.is_in(
391                    sst_ids.drain(..std::cmp::min(sst_info_fetch_batch_size, sst_ids.len())),
392                ))
393                .all(&sql_store.conn)
394                .await?;
395            for sst_info in sst_infos {
396                let sst_info: SstableInfo = sst_info.sstable_info.to_protobuf().into();
397                sst_id_to_info.insert(sst_info.sst_id, sst_info);
398            }
399        }
400        if sst_count != sst_id_to_info.len() {
401            return Err(Error::TimeTravel(anyhow!(format!(
402                "some SstableInfos not found for epoch {}, version {}",
403                query_epoch, actual_version_id,
404            ))));
405        }
406        refill_version(&mut actual_version, &sst_id_to_info, table_id);
407        timer.observe_duration();
408        Ok(actual_version)
409    }
410
411    pub(crate) async fn write_time_travel_metadata(
412        &self,
413        txn: &DatabaseTransaction,
414        version: Option<&HummockVersion>,
415        delta: HummockVersionDelta,
416        time_travel_table_ids: HashSet<StateTableId>,
417        skip_sst_ids: &HashSet<HummockSstableId>,
418        tables_to_commit: impl Iterator<Item = (&TableId, &CompactionGroupId, u64)>,
419    ) -> Result<Option<HashSet<HummockSstableId>>> {
420        if self
421            .env
422            .system_params_reader()
423            .await
424            .time_travel_retention_ms()
425            == 0
426        {
427            return Ok(None);
428        }
429        async fn write_sstable_infos(
430            mut sst_infos: impl Iterator<Item = &SstableInfo>,
431            txn: &DatabaseTransaction,
432            batch_size: usize,
433        ) -> Result<usize> {
434            let mut count = 0;
435            let mut is_finished = false;
436            while !is_finished {
437                let mut remain = batch_size;
438                let mut batch = vec![];
439                while remain > 0 {
440                    let Some(sst_info) = sst_infos.next() else {
441                        is_finished = true;
442                        break;
443                    };
444                    batch.push(hummock_sstable_info::ActiveModel {
445                        sst_id: Set(sst_info.sst_id.try_into().unwrap()),
446                        object_id: Set(sst_info.object_id.try_into().unwrap()),
447                        sstable_info: Set(SstableInfoV2Backend::from(&sst_info.to_protobuf())),
448                    });
449                    remain -= 1;
450                    count += 1;
451                }
452                if batch.is_empty() {
453                    break;
454                }
455                hummock_sstable_info::Entity::insert_many(batch)
456                    .on_conflict_do_nothing()
457                    .exec(txn)
458                    .await?;
459            }
460            Ok(count)
461        }
462
463        let mut batch = vec![];
464        for (table_id, _cg_id, committed_epoch) in tables_to_commit {
465            let version_id: u64 = delta.id.to_u64();
466            let m = hummock_epoch_to_version::ActiveModel {
467                epoch: Set(committed_epoch.try_into().unwrap()),
468                table_id: Set(table_id.table_id.into()),
469                version_id: Set(version_id.try_into().unwrap()),
470            };
471            batch.push(m);
472            if batch.len()
473                >= self
474                    .env
475                    .opts
476                    .hummock_time_travel_epoch_version_insert_batch_size
477            {
478                // There should be no conflict rows.
479                hummock_epoch_to_version::Entity::insert_many(std::mem::take(&mut batch))
480                    .do_nothing()
481                    .exec(txn)
482                    .await?;
483            }
484        }
485        if !batch.is_empty() {
486            // There should be no conflict rows.
487            hummock_epoch_to_version::Entity::insert_many(batch)
488                .do_nothing()
489                .exec(txn)
490                .await?;
491        }
492
493        let mut version_sst_ids = None;
494        if let Some(version) = version {
495            // `version_sst_ids` is used to update `last_time_travel_snapshot_sst_ids`.
496            version_sst_ids = Some(
497                version
498                    .get_sst_infos(true)
499                    .filter_map(|s| {
500                        if s.table_ids
501                            .iter()
502                            .any(|tid| time_travel_table_ids.contains(tid))
503                        {
504                            return Some(s.sst_id);
505                        }
506                        None
507                    })
508                    .collect(),
509            );
510            write_sstable_infos(
511                version.get_sst_infos(true).filter(|s| {
512                    !skip_sst_ids.contains(&s.sst_id)
513                        && s.table_ids
514                            .iter()
515                            .any(|tid| time_travel_table_ids.contains(tid))
516                }),
517                txn,
518                self.env.opts.hummock_time_travel_sst_info_insert_batch_size,
519            )
520            .await?;
521            let m = hummock_time_travel_version::ActiveModel {
522                version_id: Set(risingwave_meta_model::HummockVersionId::try_from(
523                    version.id.to_u64(),
524                )
525                .unwrap()),
526                version: Set(
527                    (&IncompleteHummockVersion::from((version, &time_travel_table_ids))
528                        .to_protobuf())
529                        .into(),
530                ),
531            };
532            hummock_time_travel_version::Entity::insert(m)
533                .on_conflict_do_nothing()
534                .exec(txn)
535                .await?;
536            // Return early to skip persisting delta.
537            return Ok(version_sst_ids);
538        }
539        let written = write_sstable_infos(
540            delta.newly_added_sst_infos(true).filter(|s| {
541                !skip_sst_ids.contains(&s.sst_id)
542                    && s.table_ids
543                        .iter()
544                        .any(|tid| time_travel_table_ids.contains(tid))
545            }),
546            txn,
547            self.env.opts.hummock_time_travel_sst_info_insert_batch_size,
548        )
549        .await?;
550        // Ignore delta which adds no data.
551        if written > 0 {
552            let m = hummock_time_travel_delta::ActiveModel {
553                version_id: Set(risingwave_meta_model::HummockVersionId::try_from(
554                    delta.id.to_u64(),
555                )
556                .unwrap()),
557                version_delta: Set((&IncompleteHummockVersionDelta::from((
558                    &delta,
559                    &time_travel_table_ids,
560                ))
561                .to_protobuf())
562                    .into()),
563            };
564            hummock_time_travel_delta::Entity::insert(m)
565                .on_conflict_do_nothing()
566                .exec(txn)
567                .await?;
568        }
569
570        Ok(version_sst_ids)
571    }
572}
573
574/// The `HummockVersion` is actually `InHummockVersion`. It requires `refill_version`.
575fn replay_archive(
576    version: PbHummockVersion,
577    deltas: impl Iterator<Item = PbHummockVersionDelta>,
578) -> HummockVersion {
579    // The pb version ann pb version delta are actually written by InHummockVersion and InHummockVersionDelta, respectively.
580    // Using HummockVersion make it easier for `refill_version` later.
581    let mut last_version = HummockVersion::from_persisted_protobuf(&version);
582    for d in deltas {
583        let d = HummockVersionDelta::from_persisted_protobuf(&d);
584        debug_assert!(
585            !should_mark_next_time_travel_version_snapshot(&d),
586            "unexpected time travel delta {:?}",
587            d
588        );
589        // Need to work around the assertion in `apply_version_delta`.
590        // Because compaction deltas are not included in time travel archive.
591        while last_version.id < d.prev_id {
592            last_version.id = last_version.id + 1;
593        }
594        last_version.apply_version_delta(&d);
595    }
596    last_version
597}
598
599pub fn require_sql_meta_store_err() -> Error {
600    Error::TimeTravel(anyhow!("require SQL meta store"))
601}
602
603/// Time travel delta replay only expect `NewL0SubLevel`. In all other cases, a new version snapshot should be created.
604pub fn should_mark_next_time_travel_version_snapshot(delta: &HummockVersionDelta) -> bool {
605    delta.group_deltas.iter().any(|(_, deltas)| {
606        deltas
607            .group_deltas
608            .iter()
609            .any(|d| !matches!(d, GroupDeltaCommon::NewL0SubLevel(_)))
610    })
611}