risingwave_meta/hummock/manager/
time_travel.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet, VecDeque};
16
17use anyhow::anyhow;
18use futures::TryStreamExt;
19use risingwave_common::catalog::TableId;
20use risingwave_common::system_param::reader::SystemParamsRead;
21use risingwave_common::util::epoch::Epoch;
22use risingwave_hummock_sdk::compaction_group::StateTableId;
23use risingwave_hummock_sdk::sstable_info::SstableInfo;
24use risingwave_hummock_sdk::time_travel::{
25    IncompleteHummockVersion, IncompleteHummockVersionDelta, refill_version,
26};
27use risingwave_hummock_sdk::version::{GroupDeltaCommon, HummockVersion, HummockVersionDelta};
28use risingwave_hummock_sdk::{
29    CompactionGroupId, HummockEpoch, HummockObjectId, HummockSstableId, HummockSstableObjectId,
30};
31use risingwave_meta_model::hummock_sstable_info::SstableInfoV2Backend;
32use risingwave_meta_model::{
33    HummockVersionId, hummock_epoch_to_version, hummock_sstable_info, hummock_time_travel_delta,
34    hummock_time_travel_version,
35};
36use risingwave_pb::hummock::{PbHummockVersion, PbHummockVersionDelta};
37use sea_orm::ActiveValue::Set;
38use sea_orm::{
39    ColumnTrait, Condition, DatabaseTransaction, EntityTrait, PaginatorTrait, QueryFilter,
40    QueryOrder, QuerySelect, TransactionTrait,
41};
42
43use crate::hummock::HummockManager;
44use crate::hummock::error::{Error, Result};
45
46/// Time travel.
47impl HummockManager {
48    pub(crate) async fn init_time_travel_state(&self) -> Result<()> {
49        let sql_store = self.env.meta_store_ref();
50        let mut guard = self.versioning.write().await;
51        guard.mark_next_time_travel_version_snapshot();
52
53        guard.last_time_travel_snapshot_sst_ids = HashSet::new();
54        let Some(version) = hummock_time_travel_version::Entity::find()
55            .order_by_desc(hummock_time_travel_version::Column::VersionId)
56            .one(&sql_store.conn)
57            .await?
58            .map(|v| IncompleteHummockVersion::from_persisted_protobuf(&v.version.to_protobuf()))
59        else {
60            return Ok(());
61        };
62        guard.last_time_travel_snapshot_sst_ids = version.get_sst_ids(true);
63        Ok(())
64    }
65
66    pub(crate) async fn truncate_time_travel_metadata(
67        &self,
68        epoch_watermark: HummockEpoch,
69    ) -> Result<()> {
70        let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id();
71        let sql_store = self.env.meta_store_ref();
72        let txn = sql_store.conn.begin().await?;
73        let version_watermark = hummock_epoch_to_version::Entity::find()
74            .filter(
75                hummock_epoch_to_version::Column::Epoch
76                    .lt(risingwave_meta_model::Epoch::try_from(epoch_watermark).unwrap()),
77            )
78            .order_by_desc(hummock_epoch_to_version::Column::Epoch)
79            .order_by_asc(hummock_epoch_to_version::Column::VersionId)
80            .one(&txn)
81            .await?;
82        let Some(version_watermark) = version_watermark else {
83            txn.commit().await?;
84            return Ok(());
85        };
86        let watermark_version_id = std::cmp::min(
87            version_watermark.version_id,
88            min_pinned_version_id.to_u64().try_into().unwrap(),
89        );
90        let res = hummock_epoch_to_version::Entity::delete_many()
91            .filter(
92                hummock_epoch_to_version::Column::Epoch
93                    .lt(risingwave_meta_model::Epoch::try_from(epoch_watermark).unwrap()),
94            )
95            .exec(&txn)
96            .await?;
97        tracing::debug!(
98            epoch_watermark,
99            "delete {} rows from hummock_epoch_to_version",
100            res.rows_affected
101        );
102        let latest_valid_version = hummock_time_travel_version::Entity::find()
103            .filter(hummock_time_travel_version::Column::VersionId.lte(watermark_version_id))
104            .order_by_desc(hummock_time_travel_version::Column::VersionId)
105            .one(&txn)
106            .await?
107            .map(|m| IncompleteHummockVersion::from_persisted_protobuf(&m.version.to_protobuf()));
108        let Some(latest_valid_version) = latest_valid_version else {
109            txn.commit().await?;
110            return Ok(());
111        };
112        let (
113            latest_valid_version_id,
114            latest_valid_version_sst_ids,
115            latest_valid_version_object_ids,
116        ) = {
117            (
118                latest_valid_version.id,
119                latest_valid_version.get_sst_ids(true),
120                latest_valid_version.get_object_ids(true),
121            )
122        };
123        let mut object_ids_to_delete: HashSet<_> = HashSet::default();
124        let version_ids_to_delete: Vec<risingwave_meta_model::HummockVersionId> =
125            hummock_time_travel_version::Entity::find()
126                .select_only()
127                .column(hummock_time_travel_version::Column::VersionId)
128                .filter(
129                    hummock_time_travel_version::Column::VersionId
130                        .lt(latest_valid_version_id.to_u64()),
131                )
132                .order_by_desc(hummock_time_travel_version::Column::VersionId)
133                .into_tuple()
134                .all(&txn)
135                .await?;
136        let delta_ids_to_delete: Vec<risingwave_meta_model::HummockVersionId> =
137            hummock_time_travel_delta::Entity::find()
138                .select_only()
139                .column(hummock_time_travel_delta::Column::VersionId)
140                .filter(
141                    hummock_time_travel_delta::Column::VersionId
142                        .lt(latest_valid_version_id.to_u64()),
143                )
144                .into_tuple()
145                .all(&txn)
146                .await?;
147        // Reuse hummock_time_travel_epoch_version_insert_batch_size as threshold.
148        let delete_sst_batch_size = self
149            .env
150            .opts
151            .hummock_time_travel_epoch_version_insert_batch_size;
152        let mut sst_ids_to_delete: HashSet<_> = HashSet::default();
153        async fn delete_sst_in_batch(
154            txn: &DatabaseTransaction,
155            sst_ids_to_delete: HashSet<HummockSstableId>,
156            delete_sst_batch_size: usize,
157        ) -> Result<()> {
158            for start_idx in 0..=(sst_ids_to_delete.len().saturating_sub(1) / delete_sst_batch_size)
159            {
160                hummock_sstable_info::Entity::delete_many()
161                    .filter(
162                        hummock_sstable_info::Column::SstId.is_in(
163                            sst_ids_to_delete
164                                .iter()
165                                .skip(start_idx * delete_sst_batch_size)
166                                .take(delete_sst_batch_size)
167                                .map(|sst_id| sst_id.inner()),
168                        ),
169                    )
170                    .exec(txn)
171                    .await?;
172            }
173            Ok(())
174        }
175        for delta_id_to_delete in delta_ids_to_delete {
176            let delta_to_delete = hummock_time_travel_delta::Entity::find_by_id(delta_id_to_delete)
177                .one(&txn)
178                .await?
179                .ok_or_else(|| {
180                    Error::TimeTravel(anyhow!(format!(
181                        "version delta {} not found",
182                        delta_id_to_delete
183                    )))
184                })?;
185            let delta_to_delete = IncompleteHummockVersionDelta::from_persisted_protobuf(
186                &delta_to_delete.version_delta.to_protobuf(),
187            );
188            let new_sst_ids = delta_to_delete.newly_added_sst_ids(true);
189            // The SST ids added and then deleted by compaction between the 2 versions.
190            sst_ids_to_delete.extend(&new_sst_ids - &latest_valid_version_sst_ids);
191            if sst_ids_to_delete.len() >= delete_sst_batch_size {
192                delete_sst_in_batch(
193                    &txn,
194                    std::mem::take(&mut sst_ids_to_delete),
195                    delete_sst_batch_size,
196                )
197                .await?;
198            }
199            let new_object_ids = delta_to_delete.newly_added_object_ids(true);
200            object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
201        }
202        let mut next_version_sst_ids = latest_valid_version_sst_ids;
203        for prev_version_id in version_ids_to_delete {
204            let prev_version = {
205                let prev_version = hummock_time_travel_version::Entity::find_by_id(prev_version_id)
206                    .one(&txn)
207                    .await?
208                    .ok_or_else(|| {
209                        Error::TimeTravel(anyhow!(format!(
210                            "prev_version {} not found",
211                            prev_version_id
212                        )))
213                    })?;
214                IncompleteHummockVersion::from_persisted_protobuf(
215                    &prev_version.version.to_protobuf(),
216                )
217            };
218            let sst_ids = prev_version.get_sst_ids(true);
219            // The SST ids deleted by compaction between the 2 versions.
220            sst_ids_to_delete.extend(&sst_ids - &next_version_sst_ids);
221            if sst_ids_to_delete.len() >= delete_sst_batch_size {
222                delete_sst_in_batch(
223                    &txn,
224                    std::mem::take(&mut sst_ids_to_delete),
225                    delete_sst_batch_size,
226                )
227                .await?;
228            }
229            let new_object_ids = prev_version.get_object_ids(true);
230            object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
231            next_version_sst_ids = sst_ids;
232        }
233        if !sst_ids_to_delete.is_empty() {
234            delete_sst_in_batch(&txn, sst_ids_to_delete, delete_sst_batch_size).await?;
235        }
236
237        if !object_ids_to_delete.is_empty() {
238            // IMPORTANT: object_ids_to_delete may include objects that are still being used by SSTs not included in time travel metadata.
239            // So it's crucial to filter out those objects before actually deleting them, i.e. when using `try_take_may_delete_object_ids`.
240            self.gc_manager
241                .add_may_delete_object_ids(object_ids_to_delete.into_iter());
242        }
243
244        let res = hummock_time_travel_version::Entity::delete_many()
245            .filter(
246                hummock_time_travel_version::Column::VersionId.lt(latest_valid_version_id.to_u64()),
247            )
248            .exec(&txn)
249            .await?;
250        tracing::debug!(
251            epoch_watermark_version_id = ?watermark_version_id,
252            ?latest_valid_version_id,
253            "delete {} rows from hummock_time_travel_version",
254            res.rows_affected
255        );
256
257        let res = hummock_time_travel_delta::Entity::delete_many()
258            .filter(
259                hummock_time_travel_delta::Column::VersionId.lt(latest_valid_version_id.to_u64()),
260            )
261            .exec(&txn)
262            .await?;
263        tracing::debug!(
264            epoch_watermark_version_id = ?watermark_version_id,
265            ?latest_valid_version_id,
266            "delete {} rows from hummock_time_travel_delta",
267            res.rows_affected
268        );
269
270        txn.commit().await?;
271        Ok(())
272    }
273
274    pub(crate) async fn filter_out_objects_by_time_travel_v1(
275        &self,
276        objects: impl Iterator<Item = HummockObjectId>,
277    ) -> Result<HashSet<HummockObjectId>> {
278        let batch_size = self
279            .env
280            .opts
281            .hummock_time_travel_filter_out_objects_batch_size;
282        // The input object count is much smaller than time travel pinned object count in meta store.
283        // So search input object in meta store.
284        let mut result: HashSet<_> = objects.collect();
285        let mut remain_sst: VecDeque<_> = result
286            .iter()
287            .map(|object_id| {
288                let HummockObjectId::Sstable(sst_id) = object_id;
289                *sst_id
290            })
291            .collect();
292        while !remain_sst.is_empty() {
293            let batch = remain_sst
294                .drain(..std::cmp::min(remain_sst.len(), batch_size))
295                .map(|object_id| object_id.as_raw().inner());
296            let reject_object_ids: Vec<risingwave_meta_model::HummockSstableObjectId> =
297                hummock_sstable_info::Entity::find()
298                    .filter(hummock_sstable_info::Column::ObjectId.is_in(batch))
299                    .select_only()
300                    .column(hummock_sstable_info::Column::ObjectId)
301                    .into_tuple()
302                    .all(&self.env.meta_store_ref().conn)
303                    .await?;
304            for reject in reject_object_ids {
305                // DO NOT REMOVE THIS LINE
306                // This is to ensure that when adding new variant to `HummockObjectId`,
307                // the compiler will warn us if we forget to handle it here.
308                match HummockObjectId::Sstable(0.into()) {
309                    HummockObjectId::Sstable(_) => {}
310                };
311                let reject: u64 = reject.try_into().unwrap();
312                let object_id = HummockObjectId::Sstable(HummockSstableObjectId::from(reject));
313                result.remove(&object_id);
314            }
315        }
316        Ok(result)
317    }
318
319    pub(crate) async fn filter_out_objects_by_time_travel(
320        &self,
321        objects: impl Iterator<Item = HummockObjectId>,
322    ) -> Result<HashSet<HummockObjectId>> {
323        if self.env.opts.hummock_time_travel_filter_out_objects_v1 {
324            return self.filter_out_objects_by_time_travel_v1(objects).await;
325        }
326        let mut result: HashSet<_> = objects.collect();
327
328        // filtered out object id pinned by time travel hummock version
329        {
330            let mut prev_version_id: Option<HummockVersionId> = None;
331            loop {
332                let query = hummock_time_travel_version::Entity::find();
333                let query = if let Some(prev_version_id) = prev_version_id {
334                    query.filter(hummock_time_travel_version::Column::VersionId.gt(prev_version_id))
335                } else {
336                    query
337                };
338                let mut version_stream = query
339                    .order_by_asc(hummock_time_travel_version::Column::VersionId)
340                    .limit(
341                        self.env
342                            .opts
343                            .hummock_time_travel_filter_out_objects_list_version_batch_size
344                            as u64,
345                    )
346                    .stream(&self.env.meta_store_ref().conn)
347                    .await?;
348                let mut next_prev_version_id = None;
349                while let Some(model) = version_stream.try_next().await? {
350                    let version =
351                        HummockVersion::from_persisted_protobuf(&model.version.to_protobuf());
352                    for object_id in version.get_object_ids(true) {
353                        result.remove(&object_id);
354                    }
355                    next_prev_version_id = Some(model.version_id);
356                }
357                if let Some(next_prev_version_id) = next_prev_version_id {
358                    prev_version_id = Some(next_prev_version_id);
359                } else {
360                    break;
361                }
362            }
363        }
364
365        // filtered out object ids pinned by time travel hummock version delta
366        {
367            let mut prev_version_id: Option<HummockVersionId> = None;
368            loop {
369                let query = hummock_time_travel_delta::Entity::find();
370                let query = if let Some(prev_version_id) = prev_version_id {
371                    query.filter(hummock_time_travel_delta::Column::VersionId.gt(prev_version_id))
372                } else {
373                    query
374                };
375                let mut version_stream = query
376                    .order_by_asc(hummock_time_travel_delta::Column::VersionId)
377                    .limit(
378                        self.env
379                            .opts
380                            .hummock_time_travel_filter_out_objects_list_delta_batch_size
381                            as u64,
382                    )
383                    .stream(&self.env.meta_store_ref().conn)
384                    .await?;
385                let mut next_prev_version_id = None;
386                while let Some(model) = version_stream.try_next().await? {
387                    let version_delta = HummockVersionDelta::from_persisted_protobuf(
388                        &model.version_delta.to_protobuf(),
389                    );
390                    // set exclude_table_change_log to true because in time travel delta we ignore the table change log
391                    for object_id in version_delta.newly_added_object_ids(true) {
392                        result.remove(&object_id);
393                    }
394                    next_prev_version_id = Some(model.version_id);
395                }
396                if let Some(next_prev_version_id) = next_prev_version_id {
397                    prev_version_id = Some(next_prev_version_id);
398                } else {
399                    break;
400                }
401            }
402        }
403
404        Ok(result)
405    }
406
407    pub(crate) async fn time_travel_pinned_object_count(&self) -> Result<u64> {
408        let count = hummock_sstable_info::Entity::find()
409            .count(&self.env.meta_store_ref().conn)
410            .await?;
411        Ok(count)
412    }
413
414    /// Attempt to locate the version corresponding to `query_epoch`.
415    ///
416    /// The version is retrieved from `hummock_epoch_to_version`, selecting the entry with the largest epoch that's lte `query_epoch`.
417    ///
418    /// The resulted version is complete, i.e. with correct `SstableInfo`.
419    pub async fn epoch_to_version(
420        &self,
421        query_epoch: HummockEpoch,
422        table_id: u32,
423    ) -> Result<HummockVersion> {
424        let sql_store = self.env.meta_store_ref();
425        let _permit = self.inflight_time_travel_query.try_acquire().map_err(|_| {
426            anyhow!(format!(
427                "too many inflight time travel queries, max_inflight_time_travel_query={}",
428                self.env.opts.max_inflight_time_travel_query
429            ))
430        })?;
431        let epoch_to_version = hummock_epoch_to_version::Entity::find()
432            .filter(
433                Condition::any()
434                    .add(hummock_epoch_to_version::Column::TableId.eq(i64::from(table_id)))
435                    // for backward compatibility
436                    .add(hummock_epoch_to_version::Column::TableId.eq(0)),
437            )
438            .filter(
439                hummock_epoch_to_version::Column::Epoch
440                    .lte(risingwave_meta_model::Epoch::try_from(query_epoch).unwrap()),
441            )
442            .order_by_desc(hummock_epoch_to_version::Column::Epoch)
443            .one(&sql_store.conn)
444            .await?
445            .ok_or_else(|| {
446                Error::TimeTravel(anyhow!(format!(
447                    "version not found for epoch {}",
448                    query_epoch
449                )))
450            })?;
451        let timer = self
452            .metrics
453            .time_travel_version_replay_latency
454            .start_timer();
455        let actual_version_id = epoch_to_version.version_id;
456        tracing::debug!(
457            query_epoch,
458            query_tz = ?(Epoch(query_epoch).as_timestamptz()),
459            actual_epoch = epoch_to_version.epoch,
460            actual_tz = ?(Epoch(u64::try_from(epoch_to_version.epoch).unwrap()).as_timestamptz()),
461            actual_version_id,
462            "convert query epoch"
463        );
464
465        let replay_version = hummock_time_travel_version::Entity::find()
466            .filter(hummock_time_travel_version::Column::VersionId.lte(actual_version_id))
467            .order_by_desc(hummock_time_travel_version::Column::VersionId)
468            .one(&sql_store.conn)
469            .await?
470            .ok_or_else(|| {
471                Error::TimeTravel(anyhow!(format!(
472                    "no replay version found for epoch {}, version {}",
473                    query_epoch, actual_version_id,
474                )))
475            })?;
476        let deltas = hummock_time_travel_delta::Entity::find()
477            .filter(hummock_time_travel_delta::Column::VersionId.gt(replay_version.version_id))
478            .filter(hummock_time_travel_delta::Column::VersionId.lte(actual_version_id))
479            .order_by_asc(hummock_time_travel_delta::Column::VersionId)
480            .all(&sql_store.conn)
481            .await?;
482        // SstableInfo in actual_version is incomplete before refill_version.
483        let mut actual_version = replay_archive(
484            replay_version.version.to_protobuf(),
485            deltas.into_iter().map(|d| d.version_delta.to_protobuf()),
486        );
487
488        let mut sst_ids = actual_version
489            .get_sst_ids(true)
490            .into_iter()
491            .collect::<VecDeque<_>>();
492        let sst_count = sst_ids.len();
493        let mut sst_id_to_info = HashMap::with_capacity(sst_count);
494        let sst_info_fetch_batch_size = self.env.opts.hummock_time_travel_sst_info_fetch_batch_size;
495        while !sst_ids.is_empty() {
496            let sst_infos = hummock_sstable_info::Entity::find()
497                .filter(
498                    hummock_sstable_info::Column::SstId.is_in(
499                        sst_ids
500                            .drain(..std::cmp::min(sst_info_fetch_batch_size, sst_ids.len()))
501                            .map(|sst_id| sst_id.inner()),
502                    ),
503                )
504                .all(&sql_store.conn)
505                .await?;
506            for sst_info in sst_infos {
507                let sst_info: SstableInfo = sst_info.sstable_info.to_protobuf().into();
508                sst_id_to_info.insert(sst_info.sst_id, sst_info);
509            }
510        }
511        if sst_count != sst_id_to_info.len() {
512            return Err(Error::TimeTravel(anyhow!(format!(
513                "some SstableInfos not found for epoch {}, version {}",
514                query_epoch, actual_version_id,
515            ))));
516        }
517        refill_version(&mut actual_version, &sst_id_to_info, table_id);
518        timer.observe_duration();
519        Ok(actual_version)
520    }
521
522    pub(crate) async fn write_time_travel_metadata(
523        &self,
524        txn: &DatabaseTransaction,
525        version: Option<&HummockVersion>,
526        delta: HummockVersionDelta,
527        time_travel_table_ids: HashSet<StateTableId>,
528        skip_sst_ids: &HashSet<HummockSstableId>,
529        tables_to_commit: impl Iterator<Item = (&TableId, &CompactionGroupId, u64)>,
530    ) -> Result<Option<HashSet<HummockSstableId>>> {
531        if self
532            .env
533            .system_params_reader()
534            .await
535            .time_travel_retention_ms()
536            == 0
537        {
538            return Ok(None);
539        }
540        async fn write_sstable_infos(
541            mut sst_infos: impl Iterator<Item = &SstableInfo>,
542            txn: &DatabaseTransaction,
543            batch_size: usize,
544        ) -> Result<usize> {
545            let mut count = 0;
546            let mut is_finished = false;
547            while !is_finished {
548                let mut remain = batch_size;
549                let mut batch = vec![];
550                while remain > 0 {
551                    let Some(sst_info) = sst_infos.next() else {
552                        is_finished = true;
553                        break;
554                    };
555                    batch.push(hummock_sstable_info::ActiveModel {
556                        sst_id: Set(sst_info.sst_id.inner().try_into().unwrap()),
557                        object_id: Set(sst_info.object_id.inner().try_into().unwrap()),
558                        sstable_info: Set(SstableInfoV2Backend::from(&sst_info.to_protobuf())),
559                    });
560                    remain -= 1;
561                    count += 1;
562                }
563                if batch.is_empty() {
564                    break;
565                }
566                hummock_sstable_info::Entity::insert_many(batch)
567                    .on_conflict_do_nothing()
568                    .exec(txn)
569                    .await?;
570            }
571            Ok(count)
572        }
573
574        let mut batch = vec![];
575        for (table_id, _cg_id, committed_epoch) in tables_to_commit {
576            let version_id: u64 = delta.id.to_u64();
577            let m = hummock_epoch_to_version::ActiveModel {
578                epoch: Set(committed_epoch.try_into().unwrap()),
579                table_id: Set(table_id.table_id.into()),
580                version_id: Set(version_id.try_into().unwrap()),
581            };
582            batch.push(m);
583            if batch.len()
584                >= self
585                    .env
586                    .opts
587                    .hummock_time_travel_epoch_version_insert_batch_size
588            {
589                // There should be no conflict rows.
590                hummock_epoch_to_version::Entity::insert_many(std::mem::take(&mut batch))
591                    .do_nothing()
592                    .exec(txn)
593                    .await?;
594            }
595        }
596        if !batch.is_empty() {
597            // There should be no conflict rows.
598            hummock_epoch_to_version::Entity::insert_many(batch)
599                .do_nothing()
600                .exec(txn)
601                .await?;
602        }
603
604        let mut version_sst_ids = None;
605        if let Some(version) = version {
606            // `version_sst_ids` is used to update `last_time_travel_snapshot_sst_ids`.
607            version_sst_ids = Some(
608                version
609                    .get_sst_infos(true)
610                    .filter_map(|s| {
611                        if s.table_ids
612                            .iter()
613                            .any(|tid| time_travel_table_ids.contains(tid))
614                        {
615                            return Some(s.sst_id);
616                        }
617                        None
618                    })
619                    .collect(),
620            );
621            write_sstable_infos(
622                version.get_sst_infos(true).filter(|s| {
623                    !skip_sst_ids.contains(&s.sst_id)
624                        && s.table_ids
625                            .iter()
626                            .any(|tid| time_travel_table_ids.contains(tid))
627                }),
628                txn,
629                self.env.opts.hummock_time_travel_sst_info_insert_batch_size,
630            )
631            .await?;
632            let m = hummock_time_travel_version::ActiveModel {
633                version_id: Set(risingwave_meta_model::HummockVersionId::try_from(
634                    version.id.to_u64(),
635                )
636                .unwrap()),
637                version: Set(
638                    (&IncompleteHummockVersion::from((version, &time_travel_table_ids))
639                        .to_protobuf())
640                        .into(),
641                ),
642            };
643            hummock_time_travel_version::Entity::insert(m)
644                .on_conflict_do_nothing()
645                .exec(txn)
646                .await?;
647            // Return early to skip persisting delta.
648            return Ok(version_sst_ids);
649        }
650        let written = write_sstable_infos(
651            delta.newly_added_sst_infos(true).filter(|s| {
652                !skip_sst_ids.contains(&s.sst_id)
653                    && s.table_ids
654                        .iter()
655                        .any(|tid| time_travel_table_ids.contains(tid))
656            }),
657            txn,
658            self.env.opts.hummock_time_travel_sst_info_insert_batch_size,
659        )
660        .await?;
661        // Ignore delta which adds no data.
662        if written > 0 {
663            let m = hummock_time_travel_delta::ActiveModel {
664                version_id: Set(risingwave_meta_model::HummockVersionId::try_from(
665                    delta.id.to_u64(),
666                )
667                .unwrap()),
668                version_delta: Set((&IncompleteHummockVersionDelta::from((
669                    &delta,
670                    &time_travel_table_ids,
671                ))
672                .to_protobuf())
673                    .into()),
674            };
675            hummock_time_travel_delta::Entity::insert(m)
676                .on_conflict_do_nothing()
677                .exec(txn)
678                .await?;
679        }
680
681        Ok(version_sst_ids)
682    }
683}
684
685/// The `HummockVersion` is actually `InHummockVersion`. It requires `refill_version`.
686fn replay_archive(
687    version: PbHummockVersion,
688    deltas: impl Iterator<Item = PbHummockVersionDelta>,
689) -> HummockVersion {
690    // The pb version ann pb version delta are actually written by InHummockVersion and InHummockVersionDelta, respectively.
691    // Using HummockVersion make it easier for `refill_version` later.
692    let mut last_version = HummockVersion::from_persisted_protobuf(&version);
693    for d in deltas {
694        let d = HummockVersionDelta::from_persisted_protobuf(&d);
695        debug_assert!(
696            !should_mark_next_time_travel_version_snapshot(&d),
697            "unexpected time travel delta {:?}",
698            d
699        );
700        // Need to work around the assertion in `apply_version_delta`.
701        // Because compaction deltas are not included in time travel archive.
702        while last_version.id < d.prev_id {
703            last_version.id = last_version.id + 1;
704        }
705        last_version.apply_version_delta(&d);
706    }
707    last_version
708}
709
710pub fn require_sql_meta_store_err() -> Error {
711    Error::TimeTravel(anyhow!("require SQL meta store"))
712}
713
714/// Time travel delta replay only expect `NewL0SubLevel`. In all other cases, a new version snapshot should be created.
715pub fn should_mark_next_time_travel_version_snapshot(delta: &HummockVersionDelta) -> bool {
716    delta.group_deltas.iter().any(|(_, deltas)| {
717        deltas
718            .group_deltas
719            .iter()
720            .any(|d| !matches!(d, GroupDeltaCommon::NewL0SubLevel(_)))
721    })
722}