risingwave_meta/hummock/manager/
time_travel.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet, VecDeque};
16
17use anyhow::anyhow;
18use futures::TryStreamExt;
19use risingwave_common::catalog::TableId;
20use risingwave_common::system_param::reader::SystemParamsRead;
21use risingwave_common::util::epoch::Epoch;
22use risingwave_hummock_sdk::compaction_group::StateTableId;
23use risingwave_hummock_sdk::sstable_info::SstableInfo;
24use risingwave_hummock_sdk::time_travel::{
25    IncompleteHummockVersion, IncompleteHummockVersionDelta, refill_version,
26};
27use risingwave_hummock_sdk::version::{GroupDeltaCommon, HummockVersion, HummockVersionDelta};
28use risingwave_hummock_sdk::{
29    CompactionGroupId, HummockEpoch, HummockObjectId, HummockSstableId, HummockSstableObjectId,
30};
31use risingwave_meta_model::hummock_sstable_info::SstableInfoV2Backend;
32use risingwave_meta_model::{
33    HummockVersionId, hummock_epoch_to_version, hummock_sstable_info, hummock_time_travel_delta,
34    hummock_time_travel_version,
35};
36use risingwave_pb::hummock::{PbHummockVersion, PbHummockVersionDelta};
37use sea_orm::ActiveValue::Set;
38use sea_orm::{
39    ColumnTrait, Condition, DatabaseTransaction, EntityTrait, PaginatorTrait, QueryFilter,
40    QueryOrder, QuerySelect, TransactionTrait,
41};
42use tracing::info;
43
44use crate::hummock::HummockManager;
45use crate::hummock::error::{Error, Result};
46
47/// Time travel.
48impl HummockManager {
49    pub(crate) async fn init_time_travel_state(&self) -> Result<()> {
50        let sql_store = self.env.meta_store_ref();
51        let mut guard = self.versioning.write().await;
52        guard.mark_next_time_travel_version_snapshot();
53
54        guard.last_time_travel_snapshot_sst_ids = HashSet::new();
55        let Some(version) = hummock_time_travel_version::Entity::find()
56            .order_by_desc(hummock_time_travel_version::Column::VersionId)
57            .one(&sql_store.conn)
58            .await?
59            .map(|v| IncompleteHummockVersion::from_persisted_protobuf(&v.version.to_protobuf()))
60        else {
61            return Ok(());
62        };
63        guard.last_time_travel_snapshot_sst_ids = version.get_sst_ids(true);
64        Ok(())
65    }
66
67    pub(crate) async fn truncate_time_travel_metadata(
68        &self,
69        epoch_watermark: HummockEpoch,
70    ) -> Result<()> {
71        let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id();
72        let sql_store = self.env.meta_store_ref();
73        let txn = sql_store.conn.begin().await?;
74        let version_watermark = hummock_epoch_to_version::Entity::find()
75            .filter(
76                hummock_epoch_to_version::Column::Epoch
77                    .lt(risingwave_meta_model::Epoch::try_from(epoch_watermark).unwrap()),
78            )
79            .order_by_desc(hummock_epoch_to_version::Column::Epoch)
80            .order_by_asc(hummock_epoch_to_version::Column::VersionId)
81            .one(&txn)
82            .await?;
83        let Some(version_watermark) = version_watermark else {
84            txn.commit().await?;
85            return Ok(());
86        };
87        let watermark_version_id = std::cmp::min(
88            version_watermark.version_id,
89            min_pinned_version_id.to_u64().try_into().unwrap(),
90        );
91        let res = hummock_epoch_to_version::Entity::delete_many()
92            .filter(
93                hummock_epoch_to_version::Column::Epoch
94                    .lt(risingwave_meta_model::Epoch::try_from(epoch_watermark).unwrap()),
95            )
96            .exec(&txn)
97            .await?;
98        tracing::debug!(
99            epoch_watermark,
100            "delete {} rows from hummock_epoch_to_version",
101            res.rows_affected
102        );
103        let latest_valid_version = hummock_time_travel_version::Entity::find()
104            .filter(hummock_time_travel_version::Column::VersionId.lte(watermark_version_id))
105            .order_by_desc(hummock_time_travel_version::Column::VersionId)
106            .one(&txn)
107            .await?
108            .map(|m| IncompleteHummockVersion::from_persisted_protobuf(&m.version.to_protobuf()));
109        let Some(latest_valid_version) = latest_valid_version else {
110            txn.commit().await?;
111            return Ok(());
112        };
113        let (
114            latest_valid_version_id,
115            latest_valid_version_sst_ids,
116            latest_valid_version_object_ids,
117        ) = {
118            (
119                latest_valid_version.id,
120                latest_valid_version.get_sst_ids(true),
121                latest_valid_version
122                    .get_object_ids(true)
123                    .collect::<HashSet<_>>(),
124            )
125        };
126        let mut object_ids_to_delete: HashSet<_> = HashSet::default();
127        let version_ids_to_delete: Vec<risingwave_meta_model::HummockVersionId> =
128            hummock_time_travel_version::Entity::find()
129                .select_only()
130                .column(hummock_time_travel_version::Column::VersionId)
131                .filter(
132                    hummock_time_travel_version::Column::VersionId
133                        .lt(latest_valid_version_id.to_u64()),
134                )
135                .order_by_desc(hummock_time_travel_version::Column::VersionId)
136                .into_tuple()
137                .all(&txn)
138                .await?;
139        let delta_ids_to_delete: Vec<risingwave_meta_model::HummockVersionId> =
140            hummock_time_travel_delta::Entity::find()
141                .select_only()
142                .column(hummock_time_travel_delta::Column::VersionId)
143                .filter(
144                    hummock_time_travel_delta::Column::VersionId
145                        .lt(latest_valid_version_id.to_u64()),
146                )
147                .into_tuple()
148                .all(&txn)
149                .await?;
150        // Reuse hummock_time_travel_epoch_version_insert_batch_size as threshold.
151        let delete_sst_batch_size = self
152            .env
153            .opts
154            .hummock_time_travel_epoch_version_insert_batch_size;
155        let mut sst_ids_to_delete: HashSet<_> = HashSet::default();
156        async fn delete_sst_in_batch(
157            txn: &DatabaseTransaction,
158            sst_ids_to_delete: HashSet<HummockSstableId>,
159            delete_sst_batch_size: usize,
160        ) -> Result<()> {
161            for start_idx in 0..=(sst_ids_to_delete.len().saturating_sub(1) / delete_sst_batch_size)
162            {
163                hummock_sstable_info::Entity::delete_many()
164                    .filter(
165                        hummock_sstable_info::Column::SstId.is_in(
166                            sst_ids_to_delete
167                                .iter()
168                                .skip(start_idx * delete_sst_batch_size)
169                                .take(delete_sst_batch_size)
170                                .map(|sst_id| sst_id.inner()),
171                        ),
172                    )
173                    .exec(txn)
174                    .await?;
175            }
176            Ok(())
177        }
178        for delta_id_to_delete in delta_ids_to_delete {
179            let delta_to_delete = hummock_time_travel_delta::Entity::find_by_id(delta_id_to_delete)
180                .one(&txn)
181                .await?
182                .ok_or_else(|| {
183                    Error::TimeTravel(anyhow!(format!(
184                        "version delta {} not found",
185                        delta_id_to_delete
186                    )))
187                })?;
188            let delta_to_delete = IncompleteHummockVersionDelta::from_persisted_protobuf(
189                &delta_to_delete.version_delta.to_protobuf(),
190            );
191            let new_sst_ids = delta_to_delete.newly_added_sst_ids(true);
192            // The SST ids added and then deleted by compaction between the 2 versions.
193            sst_ids_to_delete.extend(&new_sst_ids - &latest_valid_version_sst_ids);
194            if sst_ids_to_delete.len() >= delete_sst_batch_size {
195                delete_sst_in_batch(
196                    &txn,
197                    std::mem::take(&mut sst_ids_to_delete),
198                    delete_sst_batch_size,
199                )
200                .await?;
201            }
202            let new_object_ids = delta_to_delete.newly_added_object_ids(true);
203            object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
204        }
205        let mut next_version_sst_ids = latest_valid_version_sst_ids;
206        for prev_version_id in version_ids_to_delete {
207            let prev_version = {
208                let prev_version = hummock_time_travel_version::Entity::find_by_id(prev_version_id)
209                    .one(&txn)
210                    .await?
211                    .ok_or_else(|| {
212                        Error::TimeTravel(anyhow!(format!(
213                            "prev_version {} not found",
214                            prev_version_id
215                        )))
216                    })?;
217                IncompleteHummockVersion::from_persisted_protobuf(
218                    &prev_version.version.to_protobuf(),
219                )
220            };
221            let sst_ids = prev_version.get_sst_ids(true);
222            // The SST ids deleted by compaction between the 2 versions.
223            sst_ids_to_delete.extend(&sst_ids - &next_version_sst_ids);
224            if sst_ids_to_delete.len() >= delete_sst_batch_size {
225                delete_sst_in_batch(
226                    &txn,
227                    std::mem::take(&mut sst_ids_to_delete),
228                    delete_sst_batch_size,
229                )
230                .await?;
231            }
232            let new_object_ids: HashSet<_> = prev_version.get_object_ids(true).collect();
233            object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
234            next_version_sst_ids = sst_ids;
235        }
236        if !sst_ids_to_delete.is_empty() {
237            delete_sst_in_batch(&txn, sst_ids_to_delete, delete_sst_batch_size).await?;
238        }
239
240        if !object_ids_to_delete.is_empty() {
241            // IMPORTANT: object_ids_to_delete may include objects that are still being used by SSTs not included in time travel metadata.
242            // So it's crucial to filter out those objects before actually deleting them, i.e. when using `try_take_may_delete_object_ids`.
243            self.gc_manager
244                .add_may_delete_object_ids(object_ids_to_delete.into_iter());
245        }
246
247        let res = hummock_time_travel_version::Entity::delete_many()
248            .filter(
249                hummock_time_travel_version::Column::VersionId.lt(latest_valid_version_id.to_u64()),
250            )
251            .exec(&txn)
252            .await?;
253        tracing::debug!(
254            epoch_watermark_version_id = ?watermark_version_id,
255            ?latest_valid_version_id,
256            "delete {} rows from hummock_time_travel_version",
257            res.rows_affected
258        );
259
260        let res = hummock_time_travel_delta::Entity::delete_many()
261            .filter(
262                hummock_time_travel_delta::Column::VersionId.lt(latest_valid_version_id.to_u64()),
263            )
264            .exec(&txn)
265            .await?;
266        tracing::debug!(
267            epoch_watermark_version_id = ?watermark_version_id,
268            ?latest_valid_version_id,
269            "delete {} rows from hummock_time_travel_delta",
270            res.rows_affected
271        );
272
273        txn.commit().await?;
274        Ok(())
275    }
276
277    pub(crate) async fn filter_out_objects_by_time_travel_v1(
278        &self,
279        objects: impl Iterator<Item = HummockObjectId>,
280    ) -> Result<HashSet<HummockObjectId>> {
281        let batch_size = self
282            .env
283            .opts
284            .hummock_time_travel_filter_out_objects_batch_size;
285        info!("filter out objects by time travel v1, only sst will remain in the result set");
286        // The input object count is much smaller than time travel pinned object count in meta store.
287        // So search input object in meta store.
288        let mut result: HashSet<_> = objects
289            .filter(|object_id| match object_id {
290                HummockObjectId::Sstable(_) => true,
291                HummockObjectId::VectorFile(_) => false,
292            })
293            .collect();
294        let mut remain_sst: VecDeque<_> = result.iter().copied().collect();
295        while !remain_sst.is_empty() {
296            let batch = remain_sst
297                .drain(..std::cmp::min(remain_sst.len(), batch_size))
298                .map(|object_id| object_id.as_raw().inner());
299            let reject_object_ids: Vec<risingwave_meta_model::HummockSstableObjectId> =
300                hummock_sstable_info::Entity::find()
301                    .filter(hummock_sstable_info::Column::ObjectId.is_in(batch))
302                    .select_only()
303                    .column(hummock_sstable_info::Column::ObjectId)
304                    .into_tuple()
305                    .all(&self.env.meta_store_ref().conn)
306                    .await?;
307            for reject in reject_object_ids {
308                let reject: u64 = reject.try_into().unwrap();
309                let object_id = HummockObjectId::Sstable(HummockSstableObjectId::from(reject));
310                result.remove(&object_id);
311            }
312        }
313        Ok(result)
314    }
315
316    pub(crate) async fn filter_out_objects_by_time_travel(
317        &self,
318        objects: impl Iterator<Item = HummockObjectId>,
319    ) -> Result<HashSet<HummockObjectId>> {
320        if self.env.opts.hummock_time_travel_filter_out_objects_v1 {
321            return self.filter_out_objects_by_time_travel_v1(objects).await;
322        }
323        let mut result: HashSet<_> = objects.collect();
324
325        // filtered out object id pinned by time travel hummock version
326        {
327            let mut prev_version_id: Option<HummockVersionId> = None;
328            loop {
329                let query = hummock_time_travel_version::Entity::find();
330                let query = if let Some(prev_version_id) = prev_version_id {
331                    query.filter(hummock_time_travel_version::Column::VersionId.gt(prev_version_id))
332                } else {
333                    query
334                };
335                let mut version_stream = query
336                    .order_by_asc(hummock_time_travel_version::Column::VersionId)
337                    .limit(
338                        self.env
339                            .opts
340                            .hummock_time_travel_filter_out_objects_list_version_batch_size
341                            as u64,
342                    )
343                    .stream(&self.env.meta_store_ref().conn)
344                    .await?;
345                let mut next_prev_version_id = None;
346                while let Some(model) = version_stream.try_next().await? {
347                    let version =
348                        HummockVersion::from_persisted_protobuf(&model.version.to_protobuf());
349                    for object_id in version.get_object_ids(true) {
350                        result.remove(&object_id);
351                    }
352                    next_prev_version_id = Some(model.version_id);
353                }
354                if let Some(next_prev_version_id) = next_prev_version_id {
355                    prev_version_id = Some(next_prev_version_id);
356                } else {
357                    break;
358                }
359            }
360        }
361
362        // filtered out object ids pinned by time travel hummock version delta
363        {
364            let mut prev_version_id: Option<HummockVersionId> = None;
365            loop {
366                let query = hummock_time_travel_delta::Entity::find();
367                let query = if let Some(prev_version_id) = prev_version_id {
368                    query.filter(hummock_time_travel_delta::Column::VersionId.gt(prev_version_id))
369                } else {
370                    query
371                };
372                let mut version_stream = query
373                    .order_by_asc(hummock_time_travel_delta::Column::VersionId)
374                    .limit(
375                        self.env
376                            .opts
377                            .hummock_time_travel_filter_out_objects_list_delta_batch_size
378                            as u64,
379                    )
380                    .stream(&self.env.meta_store_ref().conn)
381                    .await?;
382                let mut next_prev_version_id = None;
383                while let Some(model) = version_stream.try_next().await? {
384                    let version_delta = HummockVersionDelta::from_persisted_protobuf(
385                        &model.version_delta.to_protobuf(),
386                    );
387                    // set exclude_table_change_log to true because in time travel delta we ignore the table change log
388                    for object_id in version_delta.newly_added_object_ids(true) {
389                        result.remove(&object_id);
390                    }
391                    next_prev_version_id = Some(model.version_id);
392                }
393                if let Some(next_prev_version_id) = next_prev_version_id {
394                    prev_version_id = Some(next_prev_version_id);
395                } else {
396                    break;
397                }
398            }
399        }
400
401        Ok(result)
402    }
403
404    pub(crate) async fn time_travel_pinned_object_count(&self) -> Result<u64> {
405        let count = hummock_sstable_info::Entity::find()
406            .count(&self.env.meta_store_ref().conn)
407            .await?;
408        Ok(count)
409    }
410
411    /// Attempt to locate the version corresponding to `query_epoch`.
412    ///
413    /// The version is retrieved from `hummock_epoch_to_version`, selecting the entry with the largest epoch that's lte `query_epoch`.
414    ///
415    /// The resulted version is complete, i.e. with correct `SstableInfo`.
416    pub async fn epoch_to_version(
417        &self,
418        query_epoch: HummockEpoch,
419        table_id: u32,
420    ) -> Result<HummockVersion> {
421        let sql_store = self.env.meta_store_ref();
422        let _permit = self.inflight_time_travel_query.try_acquire().map_err(|_| {
423            anyhow!(format!(
424                "too many inflight time travel queries, max_inflight_time_travel_query={}",
425                self.env.opts.max_inflight_time_travel_query
426            ))
427        })?;
428        let epoch_to_version = hummock_epoch_to_version::Entity::find()
429            .filter(
430                Condition::any()
431                    .add(hummock_epoch_to_version::Column::TableId.eq(i64::from(table_id)))
432                    // for backward compatibility
433                    .add(hummock_epoch_to_version::Column::TableId.eq(0)),
434            )
435            .filter(
436                hummock_epoch_to_version::Column::Epoch
437                    .lte(risingwave_meta_model::Epoch::try_from(query_epoch).unwrap()),
438            )
439            .order_by_desc(hummock_epoch_to_version::Column::Epoch)
440            .one(&sql_store.conn)
441            .await?
442            .ok_or_else(|| {
443                Error::TimeTravel(anyhow!(format!(
444                    "version not found for epoch {}",
445                    query_epoch
446                )))
447            })?;
448        let timer = self
449            .metrics
450            .time_travel_version_replay_latency
451            .start_timer();
452        let actual_version_id = epoch_to_version.version_id;
453        tracing::debug!(
454            query_epoch,
455            query_tz = ?(Epoch(query_epoch).as_timestamptz()),
456            actual_epoch = epoch_to_version.epoch,
457            actual_tz = ?(Epoch(u64::try_from(epoch_to_version.epoch).unwrap()).as_timestamptz()),
458            actual_version_id,
459            "convert query epoch"
460        );
461
462        let replay_version = hummock_time_travel_version::Entity::find()
463            .filter(hummock_time_travel_version::Column::VersionId.lte(actual_version_id))
464            .order_by_desc(hummock_time_travel_version::Column::VersionId)
465            .one(&sql_store.conn)
466            .await?
467            .ok_or_else(|| {
468                Error::TimeTravel(anyhow!(format!(
469                    "no replay version found for epoch {}, version {}",
470                    query_epoch, actual_version_id,
471                )))
472            })?;
473        let deltas = hummock_time_travel_delta::Entity::find()
474            .filter(hummock_time_travel_delta::Column::VersionId.gt(replay_version.version_id))
475            .filter(hummock_time_travel_delta::Column::VersionId.lte(actual_version_id))
476            .order_by_asc(hummock_time_travel_delta::Column::VersionId)
477            .all(&sql_store.conn)
478            .await?;
479        // SstableInfo in actual_version is incomplete before refill_version.
480        let mut actual_version = replay_archive(
481            replay_version.version.to_protobuf(),
482            deltas.into_iter().map(|d| d.version_delta.to_protobuf()),
483        );
484
485        let mut sst_ids = actual_version
486            .get_sst_ids(true)
487            .into_iter()
488            .collect::<VecDeque<_>>();
489        let sst_count = sst_ids.len();
490        let mut sst_id_to_info = HashMap::with_capacity(sst_count);
491        let sst_info_fetch_batch_size = self.env.opts.hummock_time_travel_sst_info_fetch_batch_size;
492        while !sst_ids.is_empty() {
493            let sst_infos = hummock_sstable_info::Entity::find()
494                .filter(
495                    hummock_sstable_info::Column::SstId.is_in(
496                        sst_ids
497                            .drain(..std::cmp::min(sst_info_fetch_batch_size, sst_ids.len()))
498                            .map(|sst_id| sst_id.inner()),
499                    ),
500                )
501                .all(&sql_store.conn)
502                .await?;
503            for sst_info in sst_infos {
504                let sst_info: SstableInfo = sst_info.sstable_info.to_protobuf().into();
505                sst_id_to_info.insert(sst_info.sst_id, sst_info);
506            }
507        }
508        if sst_count != sst_id_to_info.len() {
509            return Err(Error::TimeTravel(anyhow!(format!(
510                "some SstableInfos not found for epoch {}, version {}",
511                query_epoch, actual_version_id,
512            ))));
513        }
514        refill_version(&mut actual_version, &sst_id_to_info, table_id);
515        timer.observe_duration();
516        Ok(actual_version)
517    }
518
519    pub(crate) async fn write_time_travel_metadata(
520        &self,
521        txn: &DatabaseTransaction,
522        version: Option<&HummockVersion>,
523        delta: HummockVersionDelta,
524        time_travel_table_ids: HashSet<StateTableId>,
525        skip_sst_ids: &HashSet<HummockSstableId>,
526        tables_to_commit: impl Iterator<Item = (&TableId, &CompactionGroupId, u64)>,
527    ) -> Result<Option<HashSet<HummockSstableId>>> {
528        if self
529            .env
530            .system_params_reader()
531            .await
532            .time_travel_retention_ms()
533            == 0
534        {
535            return Ok(None);
536        }
537        async fn write_sstable_infos(
538            mut sst_infos: impl Iterator<Item = &SstableInfo>,
539            txn: &DatabaseTransaction,
540            batch_size: usize,
541        ) -> Result<usize> {
542            let mut count = 0;
543            let mut is_finished = false;
544            while !is_finished {
545                let mut remain = batch_size;
546                let mut batch = vec![];
547                while remain > 0 {
548                    let Some(sst_info) = sst_infos.next() else {
549                        is_finished = true;
550                        break;
551                    };
552                    batch.push(hummock_sstable_info::ActiveModel {
553                        sst_id: Set(sst_info.sst_id.inner().try_into().unwrap()),
554                        object_id: Set(sst_info.object_id.inner().try_into().unwrap()),
555                        sstable_info: Set(SstableInfoV2Backend::from(&sst_info.to_protobuf())),
556                    });
557                    remain -= 1;
558                    count += 1;
559                }
560                if batch.is_empty() {
561                    break;
562                }
563                hummock_sstable_info::Entity::insert_many(batch)
564                    .on_conflict_do_nothing()
565                    .exec(txn)
566                    .await?;
567            }
568            Ok(count)
569        }
570
571        let mut batch = vec![];
572        for (table_id, _cg_id, committed_epoch) in tables_to_commit {
573            let version_id: u64 = delta.id.to_u64();
574            let m = hummock_epoch_to_version::ActiveModel {
575                epoch: Set(committed_epoch.try_into().unwrap()),
576                table_id: Set(table_id.table_id.into()),
577                version_id: Set(version_id.try_into().unwrap()),
578            };
579            batch.push(m);
580            if batch.len()
581                >= self
582                    .env
583                    .opts
584                    .hummock_time_travel_epoch_version_insert_batch_size
585            {
586                // There should be no conflict rows.
587                hummock_epoch_to_version::Entity::insert_many(std::mem::take(&mut batch))
588                    .do_nothing()
589                    .exec(txn)
590                    .await?;
591            }
592        }
593        if !batch.is_empty() {
594            // There should be no conflict rows.
595            hummock_epoch_to_version::Entity::insert_many(batch)
596                .do_nothing()
597                .exec(txn)
598                .await?;
599        }
600
601        let mut version_sst_ids = None;
602        if let Some(version) = version {
603            // `version_sst_ids` is used to update `last_time_travel_snapshot_sst_ids`.
604            version_sst_ids = Some(
605                version
606                    .get_sst_infos(true)
607                    .filter_map(|s| {
608                        if s.table_ids
609                            .iter()
610                            .any(|tid| time_travel_table_ids.contains(tid))
611                        {
612                            return Some(s.sst_id);
613                        }
614                        None
615                    })
616                    .collect(),
617            );
618            write_sstable_infos(
619                version.get_sst_infos(true).filter(|s| {
620                    !skip_sst_ids.contains(&s.sst_id)
621                        && s.table_ids
622                            .iter()
623                            .any(|tid| time_travel_table_ids.contains(tid))
624                }),
625                txn,
626                self.env.opts.hummock_time_travel_sst_info_insert_batch_size,
627            )
628            .await?;
629            let m = hummock_time_travel_version::ActiveModel {
630                version_id: Set(risingwave_meta_model::HummockVersionId::try_from(
631                    version.id.to_u64(),
632                )
633                .unwrap()),
634                version: Set(
635                    (&IncompleteHummockVersion::from((version, &time_travel_table_ids))
636                        .to_protobuf())
637                        .into(),
638                ),
639            };
640            hummock_time_travel_version::Entity::insert(m)
641                .on_conflict_do_nothing()
642                .exec(txn)
643                .await?;
644            // Return early to skip persisting delta.
645            return Ok(version_sst_ids);
646        }
647        let written = write_sstable_infos(
648            delta.newly_added_sst_infos(true).filter(|s| {
649                !skip_sst_ids.contains(&s.sst_id)
650                    && s.table_ids
651                        .iter()
652                        .any(|tid| time_travel_table_ids.contains(tid))
653            }),
654            txn,
655            self.env.opts.hummock_time_travel_sst_info_insert_batch_size,
656        )
657        .await?;
658        // Ignore delta which adds no data.
659        if written > 0 {
660            let m = hummock_time_travel_delta::ActiveModel {
661                version_id: Set(risingwave_meta_model::HummockVersionId::try_from(
662                    delta.id.to_u64(),
663                )
664                .unwrap()),
665                version_delta: Set((&IncompleteHummockVersionDelta::from((
666                    &delta,
667                    &time_travel_table_ids,
668                ))
669                .to_protobuf())
670                    .into()),
671            };
672            hummock_time_travel_delta::Entity::insert(m)
673                .on_conflict_do_nothing()
674                .exec(txn)
675                .await?;
676        }
677
678        Ok(version_sst_ids)
679    }
680}
681
682/// The `HummockVersion` is actually `InHummockVersion`. It requires `refill_version`.
683fn replay_archive(
684    version: PbHummockVersion,
685    deltas: impl Iterator<Item = PbHummockVersionDelta>,
686) -> HummockVersion {
687    // The pb version ann pb version delta are actually written by InHummockVersion and InHummockVersionDelta, respectively.
688    // Using HummockVersion make it easier for `refill_version` later.
689    let mut last_version = HummockVersion::from_persisted_protobuf(&version);
690    for d in deltas {
691        let d = HummockVersionDelta::from_persisted_protobuf(&d);
692        debug_assert!(
693            !should_mark_next_time_travel_version_snapshot(&d),
694            "unexpected time travel delta {:?}",
695            d
696        );
697        // Need to work around the assertion in `apply_version_delta`.
698        // Because compaction deltas are not included in time travel archive.
699        while last_version.id < d.prev_id {
700            last_version.id = last_version.id + 1;
701        }
702        last_version.apply_version_delta(&d);
703    }
704    last_version
705}
706
707pub fn require_sql_meta_store_err() -> Error {
708    Error::TimeTravel(anyhow!("require SQL meta store"))
709}
710
711/// Time travel delta replay only expect `NewL0SubLevel`. In all other cases, a new version snapshot should be created.
712pub fn should_mark_next_time_travel_version_snapshot(delta: &HummockVersionDelta) -> bool {
713    delta.group_deltas.iter().any(|(_, deltas)| {
714        deltas
715            .group_deltas
716            .iter()
717            .any(|d| !matches!(d, GroupDeltaCommon::NewL0SubLevel(_)))
718    })
719}