risingwave_meta/hummock/manager/
time_travel.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet, VecDeque};
16
17use anyhow::anyhow;
18use futures::TryStreamExt;
19use risingwave_common::catalog::TableId;
20use risingwave_common::system_param::reader::SystemParamsRead;
21use risingwave_common::util::epoch::Epoch;
22use risingwave_hummock_sdk::compaction_group::StateTableId;
23use risingwave_hummock_sdk::sstable_info::SstableInfo;
24use risingwave_hummock_sdk::time_travel::{
25    IncompleteHummockVersion, IncompleteHummockVersionDelta, refill_version,
26};
27use risingwave_hummock_sdk::version::{GroupDeltaCommon, HummockVersion, HummockVersionDelta};
28use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, HummockObjectId, HummockSstableId};
29use risingwave_meta_model::hummock_sstable_info::SstableInfoV2Backend;
30use risingwave_meta_model::{
31    HummockVersionId, hummock_epoch_to_version, hummock_sstable_info, hummock_time_travel_delta,
32    hummock_time_travel_version,
33};
34use risingwave_pb::hummock::{PbHummockVersion, PbHummockVersionDelta};
35use sea_orm::ActiveValue::Set;
36use sea_orm::{
37    ColumnTrait, Condition, DatabaseTransaction, EntityTrait, PaginatorTrait, QueryFilter,
38    QueryOrder, QuerySelect, TransactionTrait,
39};
40use tracing::info;
41
42use crate::hummock::HummockManager;
43use crate::hummock::error::{Error, Result};
44
45/// Time travel.
46impl HummockManager {
47    pub(crate) async fn init_time_travel_state(&self) -> Result<()> {
48        let sql_store = self.env.meta_store_ref();
49        let mut guard = self.versioning.write().await;
50        guard.mark_next_time_travel_version_snapshot();
51
52        guard.last_time_travel_snapshot_sst_ids = HashSet::new();
53        let Some(version) = hummock_time_travel_version::Entity::find()
54            .order_by_desc(hummock_time_travel_version::Column::VersionId)
55            .one(&sql_store.conn)
56            .await?
57            .map(|v| IncompleteHummockVersion::from_persisted_protobuf(&v.version.to_protobuf()))
58        else {
59            return Ok(());
60        };
61        guard.last_time_travel_snapshot_sst_ids = version.get_sst_ids(true);
62        Ok(())
63    }
64
65    pub(crate) async fn truncate_time_travel_metadata(
66        &self,
67        epoch_watermark: HummockEpoch,
68    ) -> Result<()> {
69        let _timer = self
70            .metrics
71            .time_travel_vacuum_metadata_latency
72            .start_timer();
73        let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id();
74        let sql_store = self.env.meta_store_ref();
75        let txn = sql_store.conn.begin().await?;
76        let version_watermark = hummock_epoch_to_version::Entity::find()
77            .filter(
78                hummock_epoch_to_version::Column::Epoch
79                    .lt(risingwave_meta_model::Epoch::try_from(epoch_watermark).unwrap()),
80            )
81            .order_by_desc(hummock_epoch_to_version::Column::Epoch)
82            .order_by_asc(hummock_epoch_to_version::Column::VersionId)
83            .one(&txn)
84            .await?;
85        let Some(version_watermark) = version_watermark else {
86            txn.commit().await?;
87            return Ok(());
88        };
89        let mut watermark_version_id =
90            std::cmp::min(version_watermark.version_id, min_pinned_version_id);
91        if let Some(max_version_count) = self.env.opts.time_travel_vacuum_max_version_count
92            && let Some(earliest_version_id) = hummock_time_travel_version::Entity::find()
93                .select_only()
94                .column(hummock_time_travel_version::Column::VersionId)
95                .order_by_asc(hummock_time_travel_version::Column::VersionId)
96                .into_tuple::<HummockVersionId>()
97                .one(&txn)
98                .await?
99        {
100            watermark_version_id = std::cmp::min(
101                watermark_version_id,
102                HummockVersionId::new(
103                    earliest_version_id
104                        .as_raw_id()
105                        .saturating_add(max_version_count.into()),
106                ),
107            );
108        }
109        let res = hummock_epoch_to_version::Entity::delete_many()
110            .filter(
111                hummock_epoch_to_version::Column::Epoch
112                    .lt(risingwave_meta_model::Epoch::try_from(epoch_watermark).unwrap()),
113            )
114            .exec(&txn)
115            .await?;
116        tracing::info!(
117            epoch_watermark,
118            "Delete {} rows from hummock_epoch_to_version.",
119            res.rows_affected
120        );
121        let latest_valid_version = hummock_time_travel_version::Entity::find()
122            .filter(hummock_time_travel_version::Column::VersionId.lte(watermark_version_id))
123            .order_by_desc(hummock_time_travel_version::Column::VersionId)
124            .one(&txn)
125            .await?
126            .map(|m| IncompleteHummockVersion::from_persisted_protobuf(&m.version.to_protobuf()));
127        let Some(latest_valid_version) = latest_valid_version else {
128            txn.commit().await?;
129            return Ok(());
130        };
131        let (
132            latest_valid_version_id,
133            latest_valid_version_sst_ids,
134            latest_valid_version_object_ids,
135        ) = {
136            (
137                latest_valid_version.id,
138                latest_valid_version.get_sst_ids(true),
139                latest_valid_version
140                    .get_object_ids(true)
141                    .collect::<HashSet<_>>(),
142            )
143        };
144        let mut object_ids_to_delete: HashSet<_> = HashSet::default();
145        let version_ids_to_delete: Vec<risingwave_meta_model::HummockVersionId> =
146            hummock_time_travel_version::Entity::find()
147                .select_only()
148                .column(hummock_time_travel_version::Column::VersionId)
149                .filter(hummock_time_travel_version::Column::VersionId.lt(latest_valid_version_id))
150                .order_by_desc(hummock_time_travel_version::Column::VersionId)
151                .into_tuple()
152                .all(&txn)
153                .await?;
154        let delta_ids_to_delete: Vec<risingwave_meta_model::HummockVersionId> =
155            hummock_time_travel_delta::Entity::find()
156                .select_only()
157                .column(hummock_time_travel_delta::Column::VersionId)
158                .filter(hummock_time_travel_delta::Column::VersionId.lt(latest_valid_version_id))
159                .into_tuple()
160                .all(&txn)
161                .await?;
162        // Reuse hummock_time_travel_epoch_version_insert_batch_size as threshold.
163        let delete_sst_batch_size = self
164            .env
165            .opts
166            .hummock_time_travel_epoch_version_insert_batch_size;
167        let mut sst_ids_to_delete: HashSet<_> = HashSet::default();
168        async fn delete_sst_in_batch(
169            txn: &DatabaseTransaction,
170            sst_ids_to_delete: HashSet<HummockSstableId>,
171            delete_sst_batch_size: usize,
172        ) -> Result<()> {
173            for start_idx in 0..=(sst_ids_to_delete.len().saturating_sub(1) / delete_sst_batch_size)
174            {
175                hummock_sstable_info::Entity::delete_many()
176                    .filter(
177                        hummock_sstable_info::Column::SstId.is_in(
178                            sst_ids_to_delete
179                                .iter()
180                                .skip(start_idx * delete_sst_batch_size)
181                                .take(delete_sst_batch_size)
182                                .copied(),
183                        ),
184                    )
185                    .exec(txn)
186                    .await?;
187            }
188            Ok(())
189        }
190        for delta_id_to_delete in delta_ids_to_delete {
191            let delta_to_delete = hummock_time_travel_delta::Entity::find_by_id(delta_id_to_delete)
192                .one(&txn)
193                .await?
194                .ok_or_else(|| {
195                    Error::TimeTravel(anyhow!(format!(
196                        "version delta {} not found",
197                        delta_id_to_delete
198                    )))
199                })?;
200            let delta_to_delete = IncompleteHummockVersionDelta::from_persisted_protobuf(
201                &delta_to_delete.version_delta.to_protobuf(),
202            );
203            let new_sst_ids = delta_to_delete.newly_added_sst_ids(true);
204            // The SST ids added and then deleted by compaction between the 2 versions.
205            sst_ids_to_delete.extend(&new_sst_ids - &latest_valid_version_sst_ids);
206            if sst_ids_to_delete.len() >= delete_sst_batch_size {
207                delete_sst_in_batch(
208                    &txn,
209                    std::mem::take(&mut sst_ids_to_delete),
210                    delete_sst_batch_size,
211                )
212                .await?;
213            }
214            let new_object_ids = delta_to_delete.newly_added_object_ids(true);
215            object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
216        }
217        let mut next_version_sst_ids = latest_valid_version_sst_ids;
218        for prev_version_id in version_ids_to_delete {
219            let prev_version = {
220                let prev_version = hummock_time_travel_version::Entity::find_by_id(prev_version_id)
221                    .one(&txn)
222                    .await?
223                    .ok_or_else(|| {
224                        Error::TimeTravel(anyhow!(format!(
225                            "prev_version {} not found",
226                            prev_version_id
227                        )))
228                    })?;
229                IncompleteHummockVersion::from_persisted_protobuf(
230                    &prev_version.version.to_protobuf(),
231                )
232            };
233            let sst_ids = prev_version.get_sst_ids(true);
234            // The SST ids deleted by compaction between the 2 versions.
235            sst_ids_to_delete.extend(&sst_ids - &next_version_sst_ids);
236            if sst_ids_to_delete.len() >= delete_sst_batch_size {
237                delete_sst_in_batch(
238                    &txn,
239                    std::mem::take(&mut sst_ids_to_delete),
240                    delete_sst_batch_size,
241                )
242                .await?;
243            }
244            let new_object_ids: HashSet<_> = prev_version.get_object_ids(true).collect();
245            object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
246            next_version_sst_ids = sst_ids;
247        }
248        if !sst_ids_to_delete.is_empty() {
249            delete_sst_in_batch(&txn, sst_ids_to_delete, delete_sst_batch_size).await?;
250        }
251
252        if !object_ids_to_delete.is_empty() {
253            // IMPORTANT: object_ids_to_delete may include objects that are still being used by SSTs not included in time travel metadata.
254            // So it's crucial to filter out those objects before actually deleting them, i.e. when using `try_take_may_delete_object_ids`.
255            self.gc_manager
256                .add_may_delete_object_ids(object_ids_to_delete.into_iter());
257        }
258
259        let res = hummock_time_travel_version::Entity::delete_many()
260            .filter(hummock_time_travel_version::Column::VersionId.lt(latest_valid_version_id))
261            .exec(&txn)
262            .await?;
263        tracing::info!(
264            %watermark_version_id,
265            %latest_valid_version_id,
266            "Deleted {} rows from hummock_time_travel_version.",
267            res.rows_affected
268        );
269
270        let res = hummock_time_travel_delta::Entity::delete_many()
271            .filter(hummock_time_travel_delta::Column::VersionId.lt(latest_valid_version_id))
272            .exec(&txn)
273            .await?;
274        tracing::info!(
275            %watermark_version_id,
276            %latest_valid_version_id,
277            "Deleted {} rows from hummock_time_travel_delta.",
278            res.rows_affected
279        );
280
281        txn.commit().await?;
282        Ok(())
283    }
284
285    pub(crate) async fn filter_out_objects_by_time_travel_v1(
286        &self,
287        objects: impl Iterator<Item = HummockObjectId>,
288    ) -> Result<HashSet<HummockObjectId>> {
289        let batch_size = self
290            .env
291            .opts
292            .hummock_time_travel_filter_out_objects_batch_size;
293        info!("filter out objects by time travel v1, only sst will remain in the result set");
294        // The input object count is much smaller than time travel pinned object count in meta store.
295        // So search input object in meta store.
296        let mut result: HashSet<_> = objects
297            .filter(|object_id| match object_id {
298                HummockObjectId::Sstable(_) => true,
299                HummockObjectId::VectorFile(_) | HummockObjectId::HnswGraphFile(_) => false,
300            })
301            .collect();
302        let mut remain_sst: VecDeque<_> = result.iter().copied().collect();
303        while !remain_sst.is_empty() {
304            let batch = remain_sst
305                .drain(..std::cmp::min(remain_sst.len(), batch_size))
306                .map(|object_id| object_id.as_raw());
307            let reject_object_ids: Vec<risingwave_meta_model::HummockSstableObjectId> =
308                hummock_sstable_info::Entity::find()
309                    .filter(hummock_sstable_info::Column::ObjectId.is_in(batch))
310                    .select_only()
311                    .column(hummock_sstable_info::Column::ObjectId)
312                    .into_tuple()
313                    .all(&self.env.meta_store_ref().conn)
314                    .await?;
315            for reject in reject_object_ids {
316                let object_id = HummockObjectId::Sstable(reject);
317                result.remove(&object_id);
318            }
319        }
320        Ok(result)
321    }
322
323    pub(crate) async fn filter_out_objects_by_time_travel(
324        &self,
325        objects: impl Iterator<Item = HummockObjectId>,
326    ) -> Result<HashSet<HummockObjectId>> {
327        if self.env.opts.hummock_time_travel_filter_out_objects_v1 {
328            return self.filter_out_objects_by_time_travel_v1(objects).await;
329        }
330        let mut result: HashSet<_> = objects.collect();
331
332        // filtered out object id pinned by time travel hummock version
333        {
334            let mut prev_version_id: Option<HummockVersionId> = None;
335            loop {
336                let query = hummock_time_travel_version::Entity::find();
337                let query = if let Some(prev_version_id) = prev_version_id {
338                    query.filter(hummock_time_travel_version::Column::VersionId.gt(prev_version_id))
339                } else {
340                    query
341                };
342                let mut version_stream = query
343                    .order_by_asc(hummock_time_travel_version::Column::VersionId)
344                    .limit(
345                        self.env
346                            .opts
347                            .hummock_time_travel_filter_out_objects_list_version_batch_size
348                            as u64,
349                    )
350                    .stream(&self.env.meta_store_ref().conn)
351                    .await?;
352                let mut next_prev_version_id = None;
353                while let Some(model) = version_stream.try_next().await? {
354                    let version =
355                        HummockVersion::from_persisted_protobuf(&model.version.to_protobuf());
356                    for object_id in version.get_object_ids(true) {
357                        result.remove(&object_id);
358                    }
359                    next_prev_version_id = Some(model.version_id);
360                }
361                if let Some(next_prev_version_id) = next_prev_version_id {
362                    prev_version_id = Some(next_prev_version_id);
363                } else {
364                    break;
365                }
366            }
367        }
368
369        // filtered out object ids pinned by time travel hummock version delta
370        {
371            let mut prev_version_id: Option<HummockVersionId> = None;
372            loop {
373                let query = hummock_time_travel_delta::Entity::find();
374                let query = if let Some(prev_version_id) = prev_version_id {
375                    query.filter(hummock_time_travel_delta::Column::VersionId.gt(prev_version_id))
376                } else {
377                    query
378                };
379                let mut version_stream = query
380                    .order_by_asc(hummock_time_travel_delta::Column::VersionId)
381                    .limit(
382                        self.env
383                            .opts
384                            .hummock_time_travel_filter_out_objects_list_delta_batch_size
385                            as u64,
386                    )
387                    .stream(&self.env.meta_store_ref().conn)
388                    .await?;
389                let mut next_prev_version_id = None;
390                while let Some(model) = version_stream.try_next().await? {
391                    let version_delta = HummockVersionDelta::from_persisted_protobuf(
392                        &model.version_delta.to_protobuf(),
393                    );
394                    // set exclude_table_change_log to true because in time travel delta we ignore the table change log
395                    for object_id in version_delta.newly_added_object_ids(true) {
396                        result.remove(&object_id);
397                    }
398                    next_prev_version_id = Some(model.version_id);
399                }
400                if let Some(next_prev_version_id) = next_prev_version_id {
401                    prev_version_id = Some(next_prev_version_id);
402                } else {
403                    break;
404                }
405            }
406        }
407
408        Ok(result)
409    }
410
411    pub(crate) async fn time_travel_pinned_object_count(&self) -> Result<u64> {
412        let count = hummock_sstable_info::Entity::find()
413            .count(&self.env.meta_store_ref().conn)
414            .await?;
415        Ok(count)
416    }
417
418    /// Attempt to locate the version corresponding to `query_epoch`.
419    ///
420    /// The version is retrieved from `hummock_epoch_to_version`, selecting the entry with the largest epoch that's lte `query_epoch`.
421    ///
422    /// The resulted version is complete, i.e. with correct `SstableInfo`.
423    pub async fn epoch_to_version(
424        &self,
425        query_epoch: HummockEpoch,
426        table_id: TableId,
427    ) -> Result<HummockVersion> {
428        let sql_store = self.env.meta_store_ref();
429        let _permit = self.inflight_time_travel_query.try_acquire().map_err(|_| {
430            anyhow!(format!(
431                "too many inflight time travel queries, max_inflight_time_travel_query={}",
432                self.env.opts.max_inflight_time_travel_query
433            ))
434        })?;
435        let epoch_to_version = hummock_epoch_to_version::Entity::find()
436            .filter(
437                Condition::any()
438                    .add(
439                        hummock_epoch_to_version::Column::TableId
440                            .eq(i64::from(table_id.as_raw_id())),
441                    )
442                    // for backward compatibility
443                    .add(hummock_epoch_to_version::Column::TableId.eq(0)),
444            )
445            .filter(
446                hummock_epoch_to_version::Column::Epoch
447                    .lte(risingwave_meta_model::Epoch::try_from(query_epoch).unwrap()),
448            )
449            .order_by_desc(hummock_epoch_to_version::Column::Epoch)
450            .one(&sql_store.conn)
451            .await?
452            .ok_or_else(|| {
453                Error::TimeTravel(anyhow!(format!(
454                    "version not found for epoch {}",
455                    query_epoch
456                )))
457            })?;
458        let timer = self
459            .metrics
460            .time_travel_version_replay_latency
461            .start_timer();
462        let actual_version_id = epoch_to_version.version_id;
463        tracing::debug!(
464            query_epoch,
465            query_tz = ?(Epoch(query_epoch).as_timestamptz()),
466            actual_epoch = epoch_to_version.epoch,
467            actual_tz = ?(Epoch(u64::try_from(epoch_to_version.epoch).unwrap()).as_timestamptz()),
468            %actual_version_id,
469            "convert query epoch"
470        );
471
472        let replay_version = hummock_time_travel_version::Entity::find()
473            .filter(hummock_time_travel_version::Column::VersionId.lte(actual_version_id))
474            .order_by_desc(hummock_time_travel_version::Column::VersionId)
475            .one(&sql_store.conn)
476            .await?
477            .ok_or_else(|| {
478                Error::TimeTravel(anyhow!(format!(
479                    "no replay version found for epoch {}, version {}",
480                    query_epoch, actual_version_id,
481                )))
482            })?;
483        let deltas = hummock_time_travel_delta::Entity::find()
484            .filter(hummock_time_travel_delta::Column::VersionId.gt(replay_version.version_id))
485            .filter(hummock_time_travel_delta::Column::VersionId.lte(actual_version_id))
486            .order_by_asc(hummock_time_travel_delta::Column::VersionId)
487            .all(&sql_store.conn)
488            .await?;
489        // SstableInfo in actual_version is incomplete before refill_version.
490        let mut actual_version = replay_archive(
491            replay_version.version.to_protobuf(),
492            deltas.into_iter().map(|d| d.version_delta.to_protobuf()),
493        );
494
495        let mut sst_ids = actual_version
496            .get_sst_ids(true)
497            .into_iter()
498            .collect::<VecDeque<_>>();
499        let sst_count = sst_ids.len();
500        let mut sst_id_to_info = HashMap::with_capacity(sst_count);
501        let sst_info_fetch_batch_size = self.env.opts.hummock_time_travel_sst_info_fetch_batch_size;
502        while !sst_ids.is_empty() {
503            let sst_infos = hummock_sstable_info::Entity::find()
504                .filter(hummock_sstable_info::Column::SstId.is_in(
505                    sst_ids.drain(..std::cmp::min(sst_info_fetch_batch_size, sst_ids.len())),
506                ))
507                .all(&sql_store.conn)
508                .await?;
509            for sst_info in sst_infos {
510                let sst_info: SstableInfo = sst_info.sstable_info.to_protobuf().into();
511                sst_id_to_info.insert(sst_info.sst_id, sst_info);
512            }
513        }
514        if sst_count != sst_id_to_info.len() {
515            return Err(Error::TimeTravel(anyhow!(format!(
516                "some SstableInfos not found for epoch {}, version {}",
517                query_epoch, actual_version_id,
518            ))));
519        }
520        refill_version(&mut actual_version, &sst_id_to_info, table_id);
521        timer.observe_duration();
522        Ok(actual_version)
523    }
524
525    pub(crate) async fn write_time_travel_metadata(
526        &self,
527        txn: &DatabaseTransaction,
528        version: Option<&HummockVersion>,
529        delta: HummockVersionDelta,
530        time_travel_table_ids: HashSet<StateTableId>,
531        skip_sst_ids: &HashSet<HummockSstableId>,
532        tables_to_commit: impl Iterator<Item = (&TableId, &CompactionGroupId, u64)>,
533    ) -> Result<Option<HashSet<HummockSstableId>>> {
534        let _timer = self
535            .metrics
536            .time_travel_write_metadata_latency
537            .start_timer();
538        if self
539            .env
540            .system_params_reader()
541            .await
542            .time_travel_retention_ms()
543            == 0
544        {
545            return Ok(None);
546        }
547        async fn write_sstable_infos(
548            mut sst_infos: impl Iterator<Item = &SstableInfo>,
549            txn: &DatabaseTransaction,
550            batch_size: usize,
551        ) -> Result<usize> {
552            let mut count = 0;
553            let mut is_finished = false;
554            while !is_finished {
555                let mut remain = batch_size;
556                let mut batch = vec![];
557                while remain > 0 {
558                    let Some(sst_info) = sst_infos.next() else {
559                        is_finished = true;
560                        break;
561                    };
562                    batch.push(hummock_sstable_info::ActiveModel {
563                        sst_id: Set(sst_info.sst_id),
564                        object_id: Set(sst_info.object_id),
565                        sstable_info: Set(SstableInfoV2Backend::from(&sst_info.to_protobuf())),
566                    });
567                    remain -= 1;
568                    count += 1;
569                }
570                if batch.is_empty() {
571                    break;
572                }
573                hummock_sstable_info::Entity::insert_many(batch)
574                    .on_conflict_do_nothing()
575                    .exec(txn)
576                    .await?;
577            }
578            Ok(count)
579        }
580
581        let mut batch = vec![];
582        for (table_id, _cg_id, committed_epoch) in tables_to_commit {
583            let m = hummock_epoch_to_version::ActiveModel {
584                epoch: Set(committed_epoch.try_into().unwrap()),
585                table_id: Set(i64::from(table_id.as_raw_id())),
586                version_id: Set(delta.id),
587            };
588            batch.push(m);
589            if batch.len()
590                >= self
591                    .env
592                    .opts
593                    .hummock_time_travel_epoch_version_insert_batch_size
594            {
595                // There should be no conflict rows.
596                hummock_epoch_to_version::Entity::insert_many(std::mem::take(&mut batch))
597                    .do_nothing()
598                    .exec(txn)
599                    .await?;
600            }
601        }
602        if !batch.is_empty() {
603            // There should be no conflict rows.
604            hummock_epoch_to_version::Entity::insert_many(batch)
605                .do_nothing()
606                .exec(txn)
607                .await?;
608        }
609
610        let mut version_sst_ids = None;
611        if let Some(version) = version {
612            // `version_sst_ids` is used to update `last_time_travel_snapshot_sst_ids`.
613            version_sst_ids = Some(
614                version
615                    .get_sst_infos(true)
616                    .filter_map(|s| {
617                        if s.table_ids
618                            .iter()
619                            .any(|tid| time_travel_table_ids.contains(tid))
620                        {
621                            return Some(s.sst_id);
622                        }
623                        None
624                    })
625                    .collect(),
626            );
627            write_sstable_infos(
628                version.get_sst_infos(true).filter(|s| {
629                    !skip_sst_ids.contains(&s.sst_id)
630                        && s.table_ids
631                            .iter()
632                            .any(|tid| time_travel_table_ids.contains(tid))
633                }),
634                txn,
635                self.env.opts.hummock_time_travel_sst_info_insert_batch_size,
636            )
637            .await?;
638            let m = hummock_time_travel_version::ActiveModel {
639                version_id: Set(version.id),
640                version: Set(
641                    (&IncompleteHummockVersion::from((version, &time_travel_table_ids))
642                        .to_protobuf())
643                        .into(),
644                ),
645            };
646            hummock_time_travel_version::Entity::insert(m)
647                .on_conflict_do_nothing()
648                .exec(txn)
649                .await?;
650            // Return early to skip persisting delta.
651            return Ok(version_sst_ids);
652        }
653        let written = write_sstable_infos(
654            delta.newly_added_sst_infos(true).filter(|s| {
655                !skip_sst_ids.contains(&s.sst_id)
656                    && s.table_ids
657                        .iter()
658                        .any(|tid| time_travel_table_ids.contains(tid))
659            }),
660            txn,
661            self.env.opts.hummock_time_travel_sst_info_insert_batch_size,
662        )
663        .await?;
664        let has_state_table_info_delta = delta
665            .state_table_info_delta
666            .keys()
667            .any(|table_id| time_travel_table_ids.contains(table_id));
668        if written > 0 || has_state_table_info_delta {
669            let m = hummock_time_travel_delta::ActiveModel {
670                version_id: Set(delta.id),
671                version_delta: Set((&IncompleteHummockVersionDelta::from((
672                    &delta,
673                    &time_travel_table_ids,
674                ))
675                .to_protobuf())
676                    .into()),
677            };
678            hummock_time_travel_delta::Entity::insert(m)
679                .on_conflict_do_nothing()
680                .exec(txn)
681                .await?;
682        }
683
684        Ok(version_sst_ids)
685    }
686}
687
688/// The `HummockVersion` is actually `InHummockVersion`. It requires `refill_version`.
689fn replay_archive(
690    version: PbHummockVersion,
691    deltas: impl Iterator<Item = PbHummockVersionDelta>,
692) -> HummockVersion {
693    // The pb version ann pb version delta are actually written by InHummockVersion and InHummockVersionDelta, respectively.
694    // Using HummockVersion make it easier for `refill_version` later.
695    let mut last_version = HummockVersion::from_persisted_protobuf(&version);
696    for d in deltas {
697        let d = HummockVersionDelta::from_persisted_protobuf(&d);
698        debug_assert!(
699            !should_mark_next_time_travel_version_snapshot(&d),
700            "unexpected time travel delta {:?}",
701            d
702        );
703        // Need to work around the assertion in `apply_version_delta`.
704        // Because compaction deltas are not included in time travel archive.
705        while last_version.id < d.prev_id {
706            last_version.id += 1;
707        }
708        last_version.apply_version_delta(&d);
709    }
710    last_version
711}
712
713pub fn require_sql_meta_store_err() -> Error {
714    Error::TimeTravel(anyhow!("require SQL meta store"))
715}
716
717/// Time travel delta replay only expect `NewL0SubLevel`. In all other cases, a new version snapshot should be created.
718pub fn should_mark_next_time_travel_version_snapshot(delta: &HummockVersionDelta) -> bool {
719    delta.group_deltas.iter().any(|(_, deltas)| {
720        deltas
721            .group_deltas
722            .iter()
723            .any(|d| !matches!(d, GroupDeltaCommon::NewL0SubLevel(_)))
724    })
725}