risingwave_meta/hummock/manager/
time_travel.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet, VecDeque};
16
17use anyhow::anyhow;
18use futures::TryStreamExt;
19use risingwave_common::catalog::TableId;
20use risingwave_common::system_param::reader::SystemParamsRead;
21use risingwave_common::util::epoch::Epoch;
22use risingwave_hummock_sdk::compaction_group::StateTableId;
23use risingwave_hummock_sdk::sstable_info::SstableInfo;
24use risingwave_hummock_sdk::time_travel::{
25    IncompleteHummockVersion, IncompleteHummockVersionDelta, refill_version,
26};
27use risingwave_hummock_sdk::version::{GroupDeltaCommon, HummockVersion, HummockVersionDelta};
28use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, HummockObjectId, HummockSstableId};
29use risingwave_meta_model::hummock_sstable_info::SstableInfoV2Backend;
30use risingwave_meta_model::{
31    HummockVersionId, hummock_epoch_to_version, hummock_sstable_info, hummock_time_travel_delta,
32    hummock_time_travel_version,
33};
34use risingwave_pb::hummock::{PbHummockVersion, PbHummockVersionDelta};
35use sea_orm::ActiveValue::Set;
36use sea_orm::{
37    ColumnTrait, Condition, DatabaseTransaction, EntityTrait, PaginatorTrait, QueryFilter,
38    QueryOrder, QuerySelect, TransactionTrait,
39};
40use tracing::info;
41
42use crate::hummock::HummockManager;
43use crate::hummock::error::{Error, Result};
44
45/// Time travel.
46impl HummockManager {
47    pub(crate) async fn init_time_travel_state(&self) -> Result<()> {
48        let sql_store = self.env.meta_store_ref();
49        let mut guard = self.versioning.write().await;
50        guard.mark_next_time_travel_version_snapshot();
51
52        guard.last_time_travel_snapshot_sst_ids = HashSet::new();
53        let Some(version) = hummock_time_travel_version::Entity::find()
54            .order_by_desc(hummock_time_travel_version::Column::VersionId)
55            .one(&sql_store.conn)
56            .await?
57            .map(|v| {
58                IncompleteHummockVersion::from_persisted_protobuf_owned(v.version.to_protobuf())
59            })
60        else {
61            return Ok(());
62        };
63        guard.last_time_travel_snapshot_sst_ids = version.get_sst_ids(true);
64        Ok(())
65    }
66
67    pub(crate) async fn truncate_time_travel_metadata(
68        &self,
69        epoch_watermark: HummockEpoch,
70    ) -> Result<()> {
71        let _timer = self
72            .metrics
73            .time_travel_vacuum_metadata_latency
74            .start_timer();
75        let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id();
76        let sql_store = self.env.meta_store_ref();
77        let txn = sql_store.conn.begin().await?;
78        let version_watermark = hummock_epoch_to_version::Entity::find()
79            .filter(
80                hummock_epoch_to_version::Column::Epoch
81                    .lt(risingwave_meta_model::Epoch::try_from(epoch_watermark).unwrap()),
82            )
83            .order_by_desc(hummock_epoch_to_version::Column::Epoch)
84            .order_by_asc(hummock_epoch_to_version::Column::VersionId)
85            .one(&txn)
86            .await?;
87        let Some(version_watermark) = version_watermark else {
88            txn.commit().await?;
89            return Ok(());
90        };
91        let mut watermark_version_id =
92            std::cmp::min(version_watermark.version_id, min_pinned_version_id);
93        if let Some(max_version_count) = self.env.opts.time_travel_vacuum_max_version_count
94            && let Some(earliest_version_id) = hummock_time_travel_version::Entity::find()
95                .select_only()
96                .column(hummock_time_travel_version::Column::VersionId)
97                .order_by_asc(hummock_time_travel_version::Column::VersionId)
98                .into_tuple::<HummockVersionId>()
99                .one(&txn)
100                .await?
101        {
102            watermark_version_id = std::cmp::min(
103                watermark_version_id,
104                HummockVersionId::new(
105                    earliest_version_id
106                        .as_raw_id()
107                        .saturating_add(max_version_count.into()),
108                ),
109            );
110        }
111        let res = hummock_epoch_to_version::Entity::delete_many()
112            .filter(
113                hummock_epoch_to_version::Column::Epoch
114                    .lt(risingwave_meta_model::Epoch::try_from(epoch_watermark).unwrap()),
115            )
116            .exec(&txn)
117            .await?;
118        tracing::info!(
119            epoch_watermark,
120            "Delete {} rows from hummock_epoch_to_version.",
121            res.rows_affected
122        );
123        let latest_valid_version = hummock_time_travel_version::Entity::find()
124            .filter(hummock_time_travel_version::Column::VersionId.lte(watermark_version_id))
125            .order_by_desc(hummock_time_travel_version::Column::VersionId)
126            .one(&txn)
127            .await?
128            .map(|m| {
129                IncompleteHummockVersion::from_persisted_protobuf_owned(m.version.to_protobuf())
130            });
131        let Some(latest_valid_version) = latest_valid_version else {
132            txn.commit().await?;
133            return Ok(());
134        };
135        let (
136            latest_valid_version_id,
137            latest_valid_version_sst_ids,
138            latest_valid_version_object_ids,
139        ) = {
140            (
141                latest_valid_version.id,
142                latest_valid_version.get_sst_ids(true),
143                latest_valid_version
144                    .get_object_ids(true)
145                    .collect::<HashSet<_>>(),
146            )
147        };
148        let mut object_ids_to_delete: HashSet<_> = HashSet::default();
149        let version_ids_to_delete: Vec<risingwave_meta_model::HummockVersionId> =
150            hummock_time_travel_version::Entity::find()
151                .select_only()
152                .column(hummock_time_travel_version::Column::VersionId)
153                .filter(hummock_time_travel_version::Column::VersionId.lt(latest_valid_version_id))
154                .order_by_desc(hummock_time_travel_version::Column::VersionId)
155                .into_tuple()
156                .all(&txn)
157                .await?;
158        let delta_ids_to_delete: Vec<risingwave_meta_model::HummockVersionId> =
159            hummock_time_travel_delta::Entity::find()
160                .select_only()
161                .column(hummock_time_travel_delta::Column::VersionId)
162                .filter(hummock_time_travel_delta::Column::VersionId.lt(latest_valid_version_id))
163                .into_tuple()
164                .all(&txn)
165                .await?;
166        // Reuse hummock_time_travel_epoch_version_insert_batch_size as threshold.
167        let delete_sst_batch_size = self
168            .env
169            .opts
170            .hummock_time_travel_epoch_version_insert_batch_size;
171        let mut sst_ids_to_delete: HashSet<_> = HashSet::default();
172        async fn delete_sst_in_batch(
173            txn: &DatabaseTransaction,
174            sst_ids_to_delete: HashSet<HummockSstableId>,
175            delete_sst_batch_size: usize,
176        ) -> Result<()> {
177            for start_idx in 0..=(sst_ids_to_delete.len().saturating_sub(1) / delete_sst_batch_size)
178            {
179                hummock_sstable_info::Entity::delete_many()
180                    .filter(
181                        hummock_sstable_info::Column::SstId.is_in(
182                            sst_ids_to_delete
183                                .iter()
184                                .skip(start_idx * delete_sst_batch_size)
185                                .take(delete_sst_batch_size)
186                                .copied(),
187                        ),
188                    )
189                    .exec(txn)
190                    .await?;
191            }
192            Ok(())
193        }
194        for delta_id_to_delete in delta_ids_to_delete {
195            let delta_to_delete = hummock_time_travel_delta::Entity::find_by_id(delta_id_to_delete)
196                .one(&txn)
197                .await?
198                .ok_or_else(|| {
199                    Error::TimeTravel(anyhow!(format!(
200                        "version delta {} not found",
201                        delta_id_to_delete
202                    )))
203                })?;
204            let delta_to_delete = IncompleteHummockVersionDelta::from_persisted_protobuf_owned(
205                delta_to_delete.version_delta.to_protobuf(),
206            );
207            let new_sst_ids = delta_to_delete.newly_added_sst_ids(true);
208            // The SST ids added and then deleted by compaction between the 2 versions.
209            sst_ids_to_delete.extend(&new_sst_ids - &latest_valid_version_sst_ids);
210            if sst_ids_to_delete.len() >= delete_sst_batch_size {
211                delete_sst_in_batch(
212                    &txn,
213                    std::mem::take(&mut sst_ids_to_delete),
214                    delete_sst_batch_size,
215                )
216                .await?;
217            }
218            let new_object_ids = delta_to_delete.newly_added_object_ids(true);
219            object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
220        }
221        let mut next_version_sst_ids = latest_valid_version_sst_ids;
222        for prev_version_id in version_ids_to_delete {
223            let prev_version = {
224                let prev_version = hummock_time_travel_version::Entity::find_by_id(prev_version_id)
225                    .one(&txn)
226                    .await?
227                    .ok_or_else(|| {
228                        Error::TimeTravel(anyhow!(format!(
229                            "prev_version {} not found",
230                            prev_version_id
231                        )))
232                    })?;
233                IncompleteHummockVersion::from_persisted_protobuf_owned(
234                    prev_version.version.to_protobuf(),
235                )
236            };
237            let sst_ids = prev_version.get_sst_ids(true);
238            // The SST ids deleted by compaction between the 2 versions.
239            sst_ids_to_delete.extend(&sst_ids - &next_version_sst_ids);
240            if sst_ids_to_delete.len() >= delete_sst_batch_size {
241                delete_sst_in_batch(
242                    &txn,
243                    std::mem::take(&mut sst_ids_to_delete),
244                    delete_sst_batch_size,
245                )
246                .await?;
247            }
248            let new_object_ids: HashSet<_> = prev_version.get_object_ids(true).collect();
249            object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
250            next_version_sst_ids = sst_ids;
251        }
252        if !sst_ids_to_delete.is_empty() {
253            delete_sst_in_batch(&txn, sst_ids_to_delete, delete_sst_batch_size).await?;
254        }
255
256        if !object_ids_to_delete.is_empty() {
257            // IMPORTANT: object_ids_to_delete may include objects that are still being used by SSTs not included in time travel metadata.
258            // So it's crucial to filter out those objects before actually deleting them, i.e. when using `try_take_may_delete_object_ids`.
259            self.gc_manager
260                .add_may_delete_object_ids(object_ids_to_delete.into_iter());
261        }
262
263        let res = hummock_time_travel_version::Entity::delete_many()
264            .filter(hummock_time_travel_version::Column::VersionId.lt(latest_valid_version_id))
265            .exec(&txn)
266            .await?;
267        tracing::info!(
268            %watermark_version_id,
269            %latest_valid_version_id,
270            "Deleted {} rows from hummock_time_travel_version.",
271            res.rows_affected
272        );
273
274        let res = hummock_time_travel_delta::Entity::delete_many()
275            .filter(hummock_time_travel_delta::Column::VersionId.lt(latest_valid_version_id))
276            .exec(&txn)
277            .await?;
278        tracing::info!(
279            %watermark_version_id,
280            %latest_valid_version_id,
281            "Deleted {} rows from hummock_time_travel_delta.",
282            res.rows_affected
283        );
284
285        txn.commit().await?;
286        Ok(())
287    }
288
289    pub(crate) async fn filter_out_objects_by_time_travel_v1(
290        &self,
291        objects: impl Iterator<Item = HummockObjectId>,
292    ) -> Result<HashSet<HummockObjectId>> {
293        let batch_size = self
294            .env
295            .opts
296            .hummock_time_travel_filter_out_objects_batch_size;
297        info!("filter out objects by time travel v1, only sst will remain in the result set");
298        // The input object count is much smaller than time travel pinned object count in meta store.
299        // So search input object in meta store.
300        let mut result: HashSet<_> = objects
301            .filter(|object_id| match object_id {
302                HummockObjectId::Sstable(_) => true,
303                HummockObjectId::VectorFile(_) | HummockObjectId::HnswGraphFile(_) => false,
304            })
305            .collect();
306        let mut remain_sst: VecDeque<_> = result.iter().copied().collect();
307        while !remain_sst.is_empty() {
308            let batch = remain_sst
309                .drain(..std::cmp::min(remain_sst.len(), batch_size))
310                .map(|object_id| object_id.as_raw());
311            let reject_object_ids: Vec<risingwave_meta_model::HummockSstableObjectId> =
312                hummock_sstable_info::Entity::find()
313                    .filter(hummock_sstable_info::Column::ObjectId.is_in(batch))
314                    .select_only()
315                    .column(hummock_sstable_info::Column::ObjectId)
316                    .into_tuple()
317                    .all(&self.env.meta_store_ref().conn)
318                    .await?;
319            for reject in reject_object_ids {
320                let object_id = HummockObjectId::Sstable(reject);
321                result.remove(&object_id);
322            }
323        }
324        Ok(result)
325    }
326
327    pub(crate) async fn filter_out_objects_by_time_travel(
328        &self,
329        objects: impl Iterator<Item = HummockObjectId>,
330    ) -> Result<HashSet<HummockObjectId>> {
331        if self.env.opts.hummock_time_travel_filter_out_objects_v1 {
332            return self.filter_out_objects_by_time_travel_v1(objects).await;
333        }
334        let mut result: HashSet<_> = objects.collect();
335
336        // filtered out object id pinned by time travel hummock version
337        {
338            let mut prev_version_id: Option<HummockVersionId> = None;
339            loop {
340                let query = hummock_time_travel_version::Entity::find();
341                let query = if let Some(prev_version_id) = prev_version_id {
342                    query.filter(hummock_time_travel_version::Column::VersionId.gt(prev_version_id))
343                } else {
344                    query
345                };
346                let mut version_stream = query
347                    .order_by_asc(hummock_time_travel_version::Column::VersionId)
348                    .limit(
349                        self.env
350                            .opts
351                            .hummock_time_travel_filter_out_objects_list_version_batch_size
352                            as u64,
353                    )
354                    .stream(&self.env.meta_store_ref().conn)
355                    .await?;
356                let mut next_prev_version_id = None;
357                while let Some(model) = version_stream.try_next().await? {
358                    let version =
359                        HummockVersion::from_persisted_protobuf_owned(model.version.to_protobuf());
360                    for object_id in version.get_object_ids(true) {
361                        result.remove(&object_id);
362                    }
363                    next_prev_version_id = Some(model.version_id);
364                }
365                if let Some(next_prev_version_id) = next_prev_version_id {
366                    prev_version_id = Some(next_prev_version_id);
367                } else {
368                    break;
369                }
370            }
371        }
372
373        // filtered out object ids pinned by time travel hummock version delta
374        {
375            let mut prev_version_id: Option<HummockVersionId> = None;
376            loop {
377                let query = hummock_time_travel_delta::Entity::find();
378                let query = if let Some(prev_version_id) = prev_version_id {
379                    query.filter(hummock_time_travel_delta::Column::VersionId.gt(prev_version_id))
380                } else {
381                    query
382                };
383                let mut version_stream = query
384                    .order_by_asc(hummock_time_travel_delta::Column::VersionId)
385                    .limit(
386                        self.env
387                            .opts
388                            .hummock_time_travel_filter_out_objects_list_delta_batch_size
389                            as u64,
390                    )
391                    .stream(&self.env.meta_store_ref().conn)
392                    .await?;
393                let mut next_prev_version_id = None;
394                while let Some(model) = version_stream.try_next().await? {
395                    let version_delta = HummockVersionDelta::from_persisted_protobuf_owned(
396                        model.version_delta.to_protobuf(),
397                    );
398                    // set exclude_table_change_log to true because in time travel delta we ignore the table change log
399                    for object_id in version_delta.newly_added_object_ids(true) {
400                        result.remove(&object_id);
401                    }
402                    next_prev_version_id = Some(model.version_id);
403                }
404                if let Some(next_prev_version_id) = next_prev_version_id {
405                    prev_version_id = Some(next_prev_version_id);
406                } else {
407                    break;
408                }
409            }
410        }
411
412        Ok(result)
413    }
414
415    pub(crate) async fn time_travel_pinned_object_count(&self) -> Result<u64> {
416        let count = hummock_sstable_info::Entity::find()
417            .count(&self.env.meta_store_ref().conn)
418            .await?;
419        Ok(count)
420    }
421
422    /// Attempt to locate the version corresponding to `query_epoch`.
423    ///
424    /// The version is retrieved from `hummock_epoch_to_version`, selecting the entry with the largest epoch that's lte `query_epoch`.
425    ///
426    /// The resulted version is complete, i.e. with correct `SstableInfo`.
427    pub async fn epoch_to_version(
428        &self,
429        query_epoch: HummockEpoch,
430        table_id: TableId,
431    ) -> Result<HummockVersion> {
432        let sql_store = self.env.meta_store_ref();
433        let _permit = self.inflight_time_travel_query.try_acquire().map_err(|_| {
434            anyhow!(format!(
435                "too many inflight time travel queries, max_inflight_time_travel_query={}",
436                self.env.opts.max_inflight_time_travel_query
437            ))
438        })?;
439        let epoch_to_version = hummock_epoch_to_version::Entity::find()
440            .filter(
441                Condition::any()
442                    .add(
443                        hummock_epoch_to_version::Column::TableId
444                            .eq(i64::from(table_id.as_raw_id())),
445                    )
446                    // for backward compatibility
447                    .add(hummock_epoch_to_version::Column::TableId.eq(0)),
448            )
449            .filter(
450                hummock_epoch_to_version::Column::Epoch
451                    .lte(risingwave_meta_model::Epoch::try_from(query_epoch).unwrap()),
452            )
453            .order_by_desc(hummock_epoch_to_version::Column::Epoch)
454            .one(&sql_store.conn)
455            .await?
456            .ok_or_else(|| {
457                Error::TimeTravel(anyhow!(format!(
458                    "version not found for epoch {}",
459                    query_epoch
460                )))
461            })?;
462        let timer = self
463            .metrics
464            .time_travel_version_replay_latency
465            .start_timer();
466        let actual_version_id = epoch_to_version.version_id;
467        tracing::debug!(
468            query_epoch,
469            query_tz = ?(Epoch(query_epoch).as_timestamptz()),
470            actual_epoch = epoch_to_version.epoch,
471            actual_tz = ?(Epoch(u64::try_from(epoch_to_version.epoch).unwrap()).as_timestamptz()),
472            %actual_version_id,
473            "convert query epoch"
474        );
475
476        let replay_version = hummock_time_travel_version::Entity::find()
477            .filter(hummock_time_travel_version::Column::VersionId.lte(actual_version_id))
478            .order_by_desc(hummock_time_travel_version::Column::VersionId)
479            .one(&sql_store.conn)
480            .await?
481            .ok_or_else(|| {
482                Error::TimeTravel(anyhow!(format!(
483                    "no replay version found for epoch {}, version {}",
484                    query_epoch, actual_version_id,
485                )))
486            })?;
487        let deltas = hummock_time_travel_delta::Entity::find()
488            .filter(hummock_time_travel_delta::Column::VersionId.gt(replay_version.version_id))
489            .filter(hummock_time_travel_delta::Column::VersionId.lte(actual_version_id))
490            .order_by_asc(hummock_time_travel_delta::Column::VersionId)
491            .all(&sql_store.conn)
492            .await?;
493        // SstableInfo in actual_version is incomplete before refill_version.
494        let mut actual_version = replay_archive(
495            replay_version.version.to_protobuf(),
496            deltas.into_iter().map(|d| d.version_delta.to_protobuf()),
497        );
498
499        let mut sst_ids = actual_version
500            .get_sst_ids(true)
501            .into_iter()
502            .collect::<VecDeque<_>>();
503        let sst_count = sst_ids.len();
504        let mut sst_id_to_info = HashMap::with_capacity(sst_count);
505        let sst_info_fetch_batch_size = self.env.opts.hummock_time_travel_sst_info_fetch_batch_size;
506        while !sst_ids.is_empty() {
507            let sst_infos = hummock_sstable_info::Entity::find()
508                .filter(hummock_sstable_info::Column::SstId.is_in(
509                    sst_ids.drain(..std::cmp::min(sst_info_fetch_batch_size, sst_ids.len())),
510                ))
511                .all(&sql_store.conn)
512                .await?;
513            for sst_info in sst_infos {
514                let sst_info: SstableInfo = sst_info.sstable_info.to_protobuf().into();
515                sst_id_to_info.insert(sst_info.sst_id, sst_info);
516            }
517        }
518        if sst_count != sst_id_to_info.len() {
519            return Err(Error::TimeTravel(anyhow!(format!(
520                "some SstableInfos not found for epoch {}, version {}",
521                query_epoch, actual_version_id,
522            ))));
523        }
524        refill_version(&mut actual_version, &sst_id_to_info, table_id);
525        timer.observe_duration();
526        Ok(actual_version)
527    }
528
529    pub(crate) async fn write_time_travel_metadata(
530        &self,
531        txn: &DatabaseTransaction,
532        version: Option<&HummockVersion>,
533        delta: HummockVersionDelta,
534        time_travel_table_ids: HashSet<StateTableId>,
535        skip_sst_ids: &HashSet<HummockSstableId>,
536        tables_to_commit: impl Iterator<Item = (&TableId, &CompactionGroupId, u64)>,
537    ) -> Result<Option<HashSet<HummockSstableId>>> {
538        let _timer = self
539            .metrics
540            .time_travel_write_metadata_latency
541            .start_timer();
542        if self
543            .env
544            .system_params_reader()
545            .await
546            .time_travel_retention_ms()
547            == 0
548        {
549            return Ok(None);
550        }
551        async fn write_sstable_infos(
552            mut sst_infos: impl Iterator<Item = &SstableInfo>,
553            txn: &DatabaseTransaction,
554            batch_size: usize,
555        ) -> Result<usize> {
556            let mut count = 0;
557            let mut is_finished = false;
558            while !is_finished {
559                let mut remain = batch_size;
560                let mut batch = vec![];
561                while remain > 0 {
562                    let Some(sst_info) = sst_infos.next() else {
563                        is_finished = true;
564                        break;
565                    };
566                    batch.push(hummock_sstable_info::ActiveModel {
567                        sst_id: Set(sst_info.sst_id),
568                        object_id: Set(sst_info.object_id),
569                        sstable_info: Set(SstableInfoV2Backend::from(&sst_info.to_protobuf())),
570                    });
571                    remain -= 1;
572                    count += 1;
573                }
574                if batch.is_empty() {
575                    break;
576                }
577                hummock_sstable_info::Entity::insert_many(batch)
578                    .on_conflict_do_nothing()
579                    .exec(txn)
580                    .await?;
581            }
582            Ok(count)
583        }
584
585        let mut batch = vec![];
586        for (table_id, _cg_id, committed_epoch) in tables_to_commit {
587            let m = hummock_epoch_to_version::ActiveModel {
588                epoch: Set(committed_epoch.try_into().unwrap()),
589                table_id: Set(i64::from(table_id.as_raw_id())),
590                version_id: Set(delta.id),
591            };
592            batch.push(m);
593            if batch.len()
594                >= self
595                    .env
596                    .opts
597                    .hummock_time_travel_epoch_version_insert_batch_size
598            {
599                // There should be no conflict rows.
600                hummock_epoch_to_version::Entity::insert_many(std::mem::take(&mut batch))
601                    .do_nothing()
602                    .exec(txn)
603                    .await?;
604            }
605        }
606        if !batch.is_empty() {
607            // There should be no conflict rows.
608            hummock_epoch_to_version::Entity::insert_many(batch)
609                .do_nothing()
610                .exec(txn)
611                .await?;
612        }
613
614        let mut version_sst_ids = None;
615        if let Some(version) = version {
616            // `version_sst_ids` is used to update `last_time_travel_snapshot_sst_ids`.
617            version_sst_ids = Some(
618                version
619                    .get_sst_infos(true)
620                    .filter_map(|s| {
621                        if s.table_ids
622                            .iter()
623                            .any(|tid| time_travel_table_ids.contains(tid))
624                        {
625                            return Some(s.sst_id);
626                        }
627                        None
628                    })
629                    .collect(),
630            );
631            write_sstable_infos(
632                version.get_sst_infos(true).filter(|s| {
633                    !skip_sst_ids.contains(&s.sst_id)
634                        && s.table_ids
635                            .iter()
636                            .any(|tid| time_travel_table_ids.contains(tid))
637                }),
638                txn,
639                self.env.opts.hummock_time_travel_sst_info_insert_batch_size,
640            )
641            .await?;
642            let m = hummock_time_travel_version::ActiveModel {
643                version_id: Set(version.id),
644                version: Set(
645                    (&IncompleteHummockVersion::from((version, &time_travel_table_ids))
646                        .to_protobuf())
647                        .into(),
648                ),
649            };
650            hummock_time_travel_version::Entity::insert(m)
651                .on_conflict_do_nothing()
652                .exec(txn)
653                .await?;
654            // Return early to skip persisting delta.
655            return Ok(version_sst_ids);
656        }
657        let written = write_sstable_infos(
658            delta.newly_added_sst_infos(true).filter(|s| {
659                !skip_sst_ids.contains(&s.sst_id)
660                    && s.table_ids
661                        .iter()
662                        .any(|tid| time_travel_table_ids.contains(tid))
663            }),
664            txn,
665            self.env.opts.hummock_time_travel_sst_info_insert_batch_size,
666        )
667        .await?;
668        let has_state_table_info_delta = delta
669            .state_table_info_delta
670            .keys()
671            .any(|table_id| time_travel_table_ids.contains(table_id));
672        if written > 0 || has_state_table_info_delta {
673            let m = hummock_time_travel_delta::ActiveModel {
674                version_id: Set(delta.id),
675                version_delta: Set((&IncompleteHummockVersionDelta::from((
676                    &delta,
677                    &time_travel_table_ids,
678                ))
679                .to_protobuf())
680                    .into()),
681            };
682            hummock_time_travel_delta::Entity::insert(m)
683                .on_conflict_do_nothing()
684                .exec(txn)
685                .await?;
686        }
687
688        Ok(version_sst_ids)
689    }
690}
691
692/// The `HummockVersion` is actually `InHummockVersion`. It requires `refill_version`.
693fn replay_archive(
694    version: PbHummockVersion,
695    deltas: impl Iterator<Item = PbHummockVersionDelta>,
696) -> HummockVersion {
697    // The pb version ann pb version delta are actually written by InHummockVersion and InHummockVersionDelta, respectively.
698    // Using HummockVersion make it easier for `refill_version` later.
699    let mut last_version = HummockVersion::from_persisted_protobuf_owned(version);
700    for d in deltas {
701        let d = HummockVersionDelta::from_persisted_protobuf_owned(d);
702        debug_assert!(
703            !should_mark_next_time_travel_version_snapshot(&d),
704            "unexpected time travel delta {:?}",
705            d
706        );
707        // Need to work around the assertion in `apply_version_delta`.
708        // Because compaction deltas are not included in time travel archive.
709        while last_version.id < d.prev_id {
710            last_version.id += 1;
711        }
712        last_version.apply_version_delta(&d);
713    }
714    last_version
715}
716
717pub fn require_sql_meta_store_err() -> Error {
718    Error::TimeTravel(anyhow!("require SQL meta store"))
719}
720
721/// Time travel delta replay only expect `NewL0SubLevel`. In all other cases, a new version snapshot should be created.
722pub fn should_mark_next_time_travel_version_snapshot(delta: &HummockVersionDelta) -> bool {
723    delta.group_deltas.iter().any(|(_, deltas)| {
724        deltas
725            .group_deltas
726            .iter()
727            .any(|d| !matches!(d, GroupDeltaCommon::NewL0SubLevel(_)))
728    })
729}