risingwave_meta/hummock/manager/
time_travel.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet, VecDeque};
16
17use anyhow::anyhow;
18use futures::TryStreamExt;
19use risingwave_common::catalog::TableId;
20use risingwave_common::system_param::reader::SystemParamsRead;
21use risingwave_common::util::epoch::Epoch;
22use risingwave_hummock_sdk::compaction_group::StateTableId;
23use risingwave_hummock_sdk::sstable_info::SstableInfo;
24use risingwave_hummock_sdk::time_travel::{
25    IncompleteHummockVersion, IncompleteHummockVersionDelta, refill_version,
26};
27use risingwave_hummock_sdk::version::{GroupDeltaCommon, HummockVersion, HummockVersionDelta};
28use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, HummockObjectId, HummockSstableId};
29use risingwave_meta_model::hummock_sstable_info::SstableInfoV2Backend;
30use risingwave_meta_model::{
31    HummockVersionId, hummock_epoch_to_version, hummock_sstable_info, hummock_time_travel_delta,
32    hummock_time_travel_version,
33};
34use risingwave_pb::hummock::{PbHummockVersion, PbHummockVersionDelta};
35use sea_orm::ActiveValue::Set;
36use sea_orm::{
37    ColumnTrait, Condition, DatabaseTransaction, EntityTrait, PaginatorTrait, QueryFilter,
38    QueryOrder, QuerySelect, TransactionTrait,
39};
40use tracing::info;
41
42use crate::hummock::HummockManager;
43use crate::hummock::error::{Error, Result};
44
45/// Time travel.
46impl HummockManager {
47    pub(crate) async fn init_time_travel_state(&self) -> Result<()> {
48        let sql_store = self.env.meta_store_ref();
49        let mut guard = self.versioning.write().await;
50        guard.mark_next_time_travel_version_snapshot();
51
52        guard.last_time_travel_snapshot_sst_ids = HashSet::new();
53        let Some(version) = hummock_time_travel_version::Entity::find()
54            .order_by_desc(hummock_time_travel_version::Column::VersionId)
55            .one(&sql_store.conn)
56            .await?
57            .map(|v| {
58                IncompleteHummockVersion::from_persisted_protobuf_owned(v.version.to_protobuf())
59            })
60        else {
61            return Ok(());
62        };
63        guard.last_time_travel_snapshot_sst_ids = version.get_sst_ids();
64        Ok(())
65    }
66
67    pub(crate) async fn truncate_time_travel_metadata(
68        &self,
69        epoch_watermark: HummockEpoch,
70    ) -> Result<()> {
71        let _timer = self
72            .metrics
73            .time_travel_vacuum_metadata_latency
74            .start_timer();
75        let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id();
76        let sql_store = self.env.meta_store_ref();
77        let txn = sql_store.conn.begin().await?;
78        let version_watermark = hummock_epoch_to_version::Entity::find()
79            .filter(
80                hummock_epoch_to_version::Column::Epoch
81                    .lt(risingwave_meta_model::Epoch::try_from(epoch_watermark).unwrap()),
82            )
83            .order_by_desc(hummock_epoch_to_version::Column::Epoch)
84            .order_by_asc(hummock_epoch_to_version::Column::VersionId)
85            .one(&txn)
86            .await?;
87        let Some(version_watermark) = version_watermark else {
88            txn.commit().await?;
89            return Ok(());
90        };
91        // metadata BELOW watermark_version_id will be truncated.
92        let mut watermark_version_id =
93            std::cmp::min(version_watermark.version_id, min_pinned_version_id);
94        if let Some(max_version_count) = self.env.opts.time_travel_vacuum_max_version_count {
95            let earliest2_version_ids = hummock_time_travel_version::Entity::find()
96                .select_only()
97                .column(hummock_time_travel_version::Column::VersionId)
98                .order_by_asc(hummock_time_travel_version::Column::VersionId)
99                .limit(2)
100                .into_tuple::<HummockVersionId>()
101                .all(&txn)
102                .await?;
103            // Ensure at least 1 version BELOW watermark_version_id if applying time_travel_vacuum_max_version_count.
104            if earliest2_version_ids.len() == 2 {
105                watermark_version_id = std::cmp::min(
106                    watermark_version_id,
107                    HummockVersionId::new(std::cmp::max(
108                        earliest2_version_ids[0]
109                            .as_raw_id()
110                            .saturating_add(max_version_count.into()),
111                        earliest2_version_ids[1].as_raw_id(),
112                    )),
113                );
114            }
115        }
116        let res = hummock_epoch_to_version::Entity::delete_many()
117            .filter(
118                hummock_epoch_to_version::Column::Epoch
119                    .lt(risingwave_meta_model::Epoch::try_from(epoch_watermark).unwrap()),
120            )
121            .exec(&txn)
122            .await?;
123        tracing::info!(
124            epoch_watermark,
125            "Delete {} rows from hummock_epoch_to_version.",
126            res.rows_affected
127        );
128        let latest_valid_version = hummock_time_travel_version::Entity::find()
129            .filter(hummock_time_travel_version::Column::VersionId.lte(watermark_version_id))
130            .order_by_desc(hummock_time_travel_version::Column::VersionId)
131            .one(&txn)
132            .await?
133            .map(|m| {
134                IncompleteHummockVersion::from_persisted_protobuf_owned(m.version.to_protobuf())
135            });
136        let Some(latest_valid_version) = latest_valid_version else {
137            txn.commit().await?;
138            return Ok(());
139        };
140        let (
141            latest_valid_version_id,
142            latest_valid_version_sst_ids,
143            latest_valid_version_object_ids,
144        ) = {
145            (
146                latest_valid_version.id,
147                latest_valid_version.get_sst_ids(),
148                latest_valid_version
149                    .get_object_ids()
150                    .collect::<HashSet<_>>(),
151            )
152        };
153        let mut object_ids_to_delete: HashSet<_> = HashSet::default();
154        let version_ids_to_delete: Vec<risingwave_meta_model::HummockVersionId> =
155            hummock_time_travel_version::Entity::find()
156                .select_only()
157                .column(hummock_time_travel_version::Column::VersionId)
158                .filter(hummock_time_travel_version::Column::VersionId.lt(latest_valid_version_id))
159                .order_by_desc(hummock_time_travel_version::Column::VersionId)
160                .into_tuple()
161                .all(&txn)
162                .await?;
163        let delta_ids_to_delete: Vec<risingwave_meta_model::HummockVersionId> =
164            hummock_time_travel_delta::Entity::find()
165                .select_only()
166                .column(hummock_time_travel_delta::Column::VersionId)
167                .filter(hummock_time_travel_delta::Column::VersionId.lt(latest_valid_version_id))
168                .into_tuple()
169                .all(&txn)
170                .await?;
171        let delete_sst_batch_size = self
172            .env
173            .opts
174            .hummock_time_travel_epoch_version_insert_batch_size;
175        let delta_fetch_batch_size = self
176            .env
177            .opts
178            .hummock_time_travel_delta_fetch_batch_size
179            .max(1);
180        let mut sst_ids_to_delete: HashSet<_> = HashSet::default();
181        async fn delete_sst_in_batch(
182            txn: &DatabaseTransaction,
183            sst_ids_to_delete: HashSet<HummockSstableId>,
184            delete_sst_batch_size: usize,
185        ) -> Result<()> {
186            for start_idx in 0..=(sst_ids_to_delete.len().saturating_sub(1) / delete_sst_batch_size)
187            {
188                hummock_sstable_info::Entity::delete_many()
189                    .filter(
190                        hummock_sstable_info::Column::SstId.is_in(
191                            sst_ids_to_delete
192                                .iter()
193                                .skip(start_idx * delete_sst_batch_size)
194                                .take(delete_sst_batch_size)
195                                .copied(),
196                        ),
197                    )
198                    .exec(txn)
199                    .await?;
200            }
201            Ok(())
202        }
203        for delta_id_batch in delta_ids_to_delete.chunks(delta_fetch_batch_size) {
204            let mut delta_to_delete_by_id: HashMap<_, _> =
205                hummock_time_travel_delta::Entity::find()
206                    .filter(
207                        hummock_time_travel_delta::Column::VersionId
208                            .is_in(delta_id_batch.iter().copied()),
209                    )
210                    .all(&txn)
211                    .await?
212                    .into_iter()
213                    .map(|delta| (delta.version_id, delta))
214                    .collect();
215            for &delta_id_to_delete in delta_id_batch {
216                let delta_to_delete = delta_to_delete_by_id
217                    .remove(&delta_id_to_delete)
218                    .ok_or_else(|| {
219                        Error::TimeTravel(anyhow!(format!(
220                            "version delta {} not found",
221                            delta_id_to_delete
222                        )))
223                    })?;
224                let delta_to_delete = IncompleteHummockVersionDelta::from_persisted_protobuf_owned(
225                    delta_to_delete.version_delta.to_protobuf(),
226                );
227                let new_sst_ids = delta_to_delete.newly_added_sst_ids(true);
228                // The SST ids added and then deleted by compaction between the 2 versions.
229                sst_ids_to_delete.extend(&new_sst_ids - &latest_valid_version_sst_ids);
230                if sst_ids_to_delete.len() >= delete_sst_batch_size {
231                    delete_sst_in_batch(
232                        &txn,
233                        std::mem::take(&mut sst_ids_to_delete),
234                        delete_sst_batch_size,
235                    )
236                    .await?;
237                }
238                let new_object_ids = delta_to_delete.newly_added_object_ids(true);
239                object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
240            }
241        }
242        let mut next_version_sst_ids = latest_valid_version_sst_ids;
243        for prev_version_id in version_ids_to_delete {
244            let prev_version = {
245                let prev_version = hummock_time_travel_version::Entity::find_by_id(prev_version_id)
246                    .one(&txn)
247                    .await?
248                    .ok_or_else(|| {
249                        Error::TimeTravel(anyhow!(format!(
250                            "prev_version {} not found",
251                            prev_version_id
252                        )))
253                    })?;
254                IncompleteHummockVersion::from_persisted_protobuf_owned(
255                    prev_version.version.to_protobuf(),
256                )
257            };
258            let sst_ids = prev_version.get_sst_ids();
259            // The SST ids deleted by compaction between the 2 versions.
260            sst_ids_to_delete.extend(&sst_ids - &next_version_sst_ids);
261            if sst_ids_to_delete.len() >= delete_sst_batch_size {
262                delete_sst_in_batch(
263                    &txn,
264                    std::mem::take(&mut sst_ids_to_delete),
265                    delete_sst_batch_size,
266                )
267                .await?;
268            }
269            let new_object_ids: HashSet<_> = prev_version.get_object_ids().collect();
270            object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
271            next_version_sst_ids = sst_ids;
272        }
273        if !sst_ids_to_delete.is_empty() {
274            delete_sst_in_batch(&txn, sst_ids_to_delete, delete_sst_batch_size).await?;
275        }
276
277        if !object_ids_to_delete.is_empty() {
278            // IMPORTANT: object_ids_to_delete may include objects that are still being used by SSTs not included in time travel metadata.
279            // So it's crucial to filter out those objects before actually deleting them, i.e. when using `try_take_may_delete_object_ids`.
280            self.gc_manager
281                .add_may_delete_object_ids(object_ids_to_delete.into_iter());
282        }
283
284        let res = hummock_time_travel_version::Entity::delete_many()
285            .filter(hummock_time_travel_version::Column::VersionId.lt(latest_valid_version_id))
286            .exec(&txn)
287            .await?;
288        tracing::info!(
289            %watermark_version_id,
290            %latest_valid_version_id,
291            "Deleted {} rows from hummock_time_travel_version.",
292            res.rows_affected
293        );
294
295        let res = hummock_time_travel_delta::Entity::delete_many()
296            .filter(hummock_time_travel_delta::Column::VersionId.lt(latest_valid_version_id))
297            .exec(&txn)
298            .await?;
299        tracing::info!(
300            %watermark_version_id,
301            %latest_valid_version_id,
302            "Deleted {} rows from hummock_time_travel_delta.",
303            res.rows_affected
304        );
305
306        txn.commit().await?;
307        Ok(())
308    }
309
310    pub(crate) async fn filter_out_objects_by_time_travel_v1(
311        &self,
312        objects: impl Iterator<Item = HummockObjectId>,
313    ) -> Result<HashSet<HummockObjectId>> {
314        let batch_size = self
315            .env
316            .opts
317            .hummock_time_travel_filter_out_objects_batch_size;
318        info!("filter out objects by time travel v1, only sst will remain in the result set");
319        // The input object count is much smaller than time travel pinned object count in meta store.
320        // So search input object in meta store.
321        let mut result: HashSet<_> = objects
322            .filter(|object_id| match object_id {
323                HummockObjectId::Sstable(_) => true,
324                HummockObjectId::VectorFile(_) | HummockObjectId::HnswGraphFile(_) => false,
325            })
326            .collect();
327        let mut remain_sst: VecDeque<_> = result.iter().copied().collect();
328        while !remain_sst.is_empty() {
329            let batch = remain_sst
330                .drain(..std::cmp::min(remain_sst.len(), batch_size))
331                .map(|object_id| object_id.as_raw());
332            let reject_object_ids: Vec<risingwave_meta_model::HummockSstableObjectId> =
333                hummock_sstable_info::Entity::find()
334                    .filter(hummock_sstable_info::Column::ObjectId.is_in(batch))
335                    .select_only()
336                    .column(hummock_sstable_info::Column::ObjectId)
337                    .into_tuple()
338                    .all(&self.env.meta_store_ref().conn)
339                    .await?;
340            for reject in reject_object_ids {
341                let object_id = HummockObjectId::Sstable(reject);
342                result.remove(&object_id);
343            }
344        }
345        Ok(result)
346    }
347
348    pub(crate) async fn filter_out_objects_by_time_travel(
349        &self,
350        objects: impl Iterator<Item = HummockObjectId>,
351    ) -> Result<HashSet<HummockObjectId>> {
352        if self.env.opts.hummock_time_travel_filter_out_objects_v1 {
353            return self.filter_out_objects_by_time_travel_v1(objects).await;
354        }
355        let mut result: HashSet<_> = objects.collect();
356
357        // filtered out object id pinned by time travel hummock version
358        {
359            let mut prev_version_id: Option<HummockVersionId> = None;
360            loop {
361                let query = hummock_time_travel_version::Entity::find();
362                let query = if let Some(prev_version_id) = prev_version_id {
363                    query.filter(hummock_time_travel_version::Column::VersionId.gt(prev_version_id))
364                } else {
365                    query
366                };
367                let mut version_stream = query
368                    .order_by_asc(hummock_time_travel_version::Column::VersionId)
369                    .limit(
370                        self.env
371                            .opts
372                            .hummock_time_travel_filter_out_objects_list_version_batch_size
373                            as u64,
374                    )
375                    .stream(&self.env.meta_store_ref().conn)
376                    .await?;
377                let mut next_prev_version_id = None;
378                while let Some(model) = version_stream.try_next().await? {
379                    let version =
380                        HummockVersion::from_persisted_protobuf_owned(model.version.to_protobuf());
381                    for object_id in version.get_object_ids() {
382                        result.remove(&object_id);
383                    }
384                    next_prev_version_id = Some(model.version_id);
385                }
386                if let Some(next_prev_version_id) = next_prev_version_id {
387                    prev_version_id = Some(next_prev_version_id);
388                } else {
389                    break;
390                }
391            }
392        }
393
394        // filtered out object ids pinned by time travel hummock version delta
395        {
396            let mut prev_version_id: Option<HummockVersionId> = None;
397            loop {
398                let query = hummock_time_travel_delta::Entity::find();
399                let query = if let Some(prev_version_id) = prev_version_id {
400                    query.filter(hummock_time_travel_delta::Column::VersionId.gt(prev_version_id))
401                } else {
402                    query
403                };
404                let mut version_stream = query
405                    .order_by_asc(hummock_time_travel_delta::Column::VersionId)
406                    .limit(
407                        self.env
408                            .opts
409                            .hummock_time_travel_filter_out_objects_list_delta_batch_size
410                            as u64,
411                    )
412                    .stream(&self.env.meta_store_ref().conn)
413                    .await?;
414                let mut next_prev_version_id = None;
415                while let Some(model) = version_stream.try_next().await? {
416                    let version_delta = HummockVersionDelta::from_persisted_protobuf_owned(
417                        model.version_delta.to_protobuf(),
418                    );
419                    // set exclude_table_change_log to true because in time travel delta we ignore the table change log
420                    for object_id in version_delta.newly_added_object_ids(true) {
421                        result.remove(&object_id);
422                    }
423                    next_prev_version_id = Some(model.version_id);
424                }
425                if let Some(next_prev_version_id) = next_prev_version_id {
426                    prev_version_id = Some(next_prev_version_id);
427                } else {
428                    break;
429                }
430            }
431        }
432
433        Ok(result)
434    }
435
436    pub(crate) async fn time_travel_pinned_object_count(&self) -> Result<u64> {
437        let count = hummock_sstable_info::Entity::find()
438            .count(&self.env.meta_store_ref().conn)
439            .await?;
440        Ok(count)
441    }
442
443    /// Attempt to locate the version corresponding to `query_epoch`.
444    ///
445    /// The version is retrieved from `hummock_epoch_to_version`, selecting the entry with the largest epoch that's lte `query_epoch`.
446    ///
447    /// The resulted version is complete, i.e. with correct `SstableInfo`.
448    pub async fn epoch_to_version(
449        &self,
450        query_epoch: HummockEpoch,
451        table_id: TableId,
452    ) -> Result<HummockVersion> {
453        let sql_store = self.env.meta_store_ref();
454        let _permit = self.inflight_time_travel_query.try_acquire().map_err(|_| {
455            anyhow!(format!(
456                "too many inflight time travel queries, max_inflight_time_travel_query={}",
457                self.env.opts.max_inflight_time_travel_query
458            ))
459        })?;
460        let epoch_to_version = hummock_epoch_to_version::Entity::find()
461            .filter(
462                Condition::any()
463                    .add(
464                        hummock_epoch_to_version::Column::TableId
465                            .eq(i64::from(table_id.as_raw_id())),
466                    )
467                    // for backward compatibility
468                    .add(hummock_epoch_to_version::Column::TableId.eq(0)),
469            )
470            .filter(
471                hummock_epoch_to_version::Column::Epoch
472                    .lte(risingwave_meta_model::Epoch::try_from(query_epoch).unwrap()),
473            )
474            .order_by_desc(hummock_epoch_to_version::Column::Epoch)
475            .one(&sql_store.conn)
476            .await?
477            .ok_or_else(|| {
478                Error::TimeTravel(anyhow!(format!(
479                    "version not found for epoch {}",
480                    query_epoch
481                )))
482            })?;
483        let timer = self
484            .metrics
485            .time_travel_version_replay_latency
486            .start_timer();
487        let actual_version_id = epoch_to_version.version_id;
488        tracing::debug!(
489            query_epoch,
490            query_tz = ?(Epoch(query_epoch).as_timestamptz()),
491            actual_epoch = epoch_to_version.epoch,
492            actual_tz = ?(Epoch(u64::try_from(epoch_to_version.epoch).unwrap()).as_timestamptz()),
493            %actual_version_id,
494            "convert query epoch"
495        );
496
497        let replay_version = hummock_time_travel_version::Entity::find()
498            .filter(hummock_time_travel_version::Column::VersionId.lte(actual_version_id))
499            .order_by_desc(hummock_time_travel_version::Column::VersionId)
500            .one(&sql_store.conn)
501            .await?
502            .ok_or_else(|| {
503                Error::TimeTravel(anyhow!(format!(
504                    "no replay version found for epoch {}, version {}",
505                    query_epoch, actual_version_id,
506                )))
507            })?;
508        let deltas = hummock_time_travel_delta::Entity::find()
509            .filter(hummock_time_travel_delta::Column::VersionId.gt(replay_version.version_id))
510            .filter(hummock_time_travel_delta::Column::VersionId.lte(actual_version_id))
511            .order_by_asc(hummock_time_travel_delta::Column::VersionId)
512            .all(&sql_store.conn)
513            .await?;
514        // SstableInfo in actual_version is incomplete before refill_version.
515        let mut actual_version = replay_archive(
516            replay_version.version.to_protobuf(),
517            deltas.into_iter().map(|d| d.version_delta.to_protobuf()),
518        );
519
520        let mut sst_ids = actual_version
521            .get_sst_ids()
522            .into_iter()
523            .collect::<VecDeque<_>>();
524        let sst_count = sst_ids.len();
525        let mut sst_id_to_info = HashMap::with_capacity(sst_count);
526        let sst_info_fetch_batch_size = self.env.opts.hummock_time_travel_sst_info_fetch_batch_size;
527        while !sst_ids.is_empty() {
528            let sst_infos = hummock_sstable_info::Entity::find()
529                .filter(hummock_sstable_info::Column::SstId.is_in(
530                    sst_ids.drain(..std::cmp::min(sst_info_fetch_batch_size, sst_ids.len())),
531                ))
532                .all(&sql_store.conn)
533                .await?;
534            for sst_info in sst_infos {
535                let sst_info: SstableInfo = sst_info.sstable_info.to_protobuf().into();
536                sst_id_to_info.insert(sst_info.sst_id, sst_info);
537            }
538        }
539        if sst_count != sst_id_to_info.len() {
540            return Err(Error::TimeTravel(anyhow!(format!(
541                "some SstableInfos not found for epoch {}, version {}",
542                query_epoch, actual_version_id,
543            ))));
544        }
545        refill_version(&mut actual_version, &sst_id_to_info, table_id);
546        timer.observe_duration();
547        Ok(actual_version)
548    }
549
550    pub(crate) async fn write_time_travel_metadata(
551        &self,
552        txn: &DatabaseTransaction,
553        version: Option<&HummockVersion>,
554        delta: HummockVersionDelta,
555        time_travel_table_ids: HashSet<StateTableId>,
556        skip_sst_ids: &HashSet<HummockSstableId>,
557        tables_to_commit: impl Iterator<Item = (&TableId, &CompactionGroupId, u64)>,
558    ) -> Result<Option<HashSet<HummockSstableId>>> {
559        let _timer = self
560            .metrics
561            .time_travel_write_metadata_latency
562            .start_timer();
563        if self
564            .env
565            .system_params_reader()
566            .await
567            .time_travel_retention_ms()
568            == 0
569        {
570            return Ok(None);
571        }
572        async fn write_sstable_infos(
573            mut sst_infos: impl Iterator<Item = &SstableInfo>,
574            txn: &DatabaseTransaction,
575            batch_size: usize,
576        ) -> Result<usize> {
577            let mut count = 0;
578            let mut is_finished = false;
579            while !is_finished {
580                let mut remain = batch_size;
581                let mut batch = vec![];
582                while remain > 0 {
583                    let Some(sst_info) = sst_infos.next() else {
584                        is_finished = true;
585                        break;
586                    };
587                    batch.push(hummock_sstable_info::ActiveModel {
588                        sst_id: Set(sst_info.sst_id),
589                        object_id: Set(sst_info.object_id),
590                        sstable_info: Set(SstableInfoV2Backend::from(&sst_info.to_protobuf())),
591                    });
592                    remain -= 1;
593                    count += 1;
594                }
595                if batch.is_empty() {
596                    break;
597                }
598                hummock_sstable_info::Entity::insert_many(batch)
599                    .on_conflict_do_nothing()
600                    .exec(txn)
601                    .await?;
602            }
603            Ok(count)
604        }
605
606        let mut batch = vec![];
607        for (table_id, _cg_id, committed_epoch) in tables_to_commit {
608            let m = hummock_epoch_to_version::ActiveModel {
609                epoch: Set(committed_epoch.try_into().unwrap()),
610                table_id: Set(i64::from(table_id.as_raw_id())),
611                version_id: Set(delta.id),
612            };
613            batch.push(m);
614            if batch.len()
615                >= self
616                    .env
617                    .opts
618                    .hummock_time_travel_epoch_version_insert_batch_size
619            {
620                // There should be no conflict rows.
621                hummock_epoch_to_version::Entity::insert_many(std::mem::take(&mut batch))
622                    .do_nothing()
623                    .exec(txn)
624                    .await?;
625            }
626        }
627        if !batch.is_empty() {
628            // There should be no conflict rows.
629            hummock_epoch_to_version::Entity::insert_many(batch)
630                .do_nothing()
631                .exec(txn)
632                .await?;
633        }
634
635        let mut version_sst_ids = None;
636        if let Some(version) = version {
637            // `version_sst_ids` is used to update `last_time_travel_snapshot_sst_ids`.
638            version_sst_ids = Some(
639                version
640                    .get_sst_infos()
641                    .filter_map(|s| {
642                        if s.table_ids
643                            .iter()
644                            .any(|tid| time_travel_table_ids.contains(tid))
645                        {
646                            return Some(s.sst_id);
647                        }
648                        None
649                    })
650                    .collect(),
651            );
652            write_sstable_infos(
653                version.get_sst_infos().filter(|s| {
654                    !skip_sst_ids.contains(&s.sst_id)
655                        && s.table_ids
656                            .iter()
657                            .any(|tid| time_travel_table_ids.contains(tid))
658                }),
659                txn,
660                self.env.opts.hummock_time_travel_sst_info_insert_batch_size,
661            )
662            .await?;
663            let m = hummock_time_travel_version::ActiveModel {
664                version_id: Set(version.id),
665                version: Set(
666                    (&IncompleteHummockVersion::from((version, &time_travel_table_ids))
667                        .to_protobuf())
668                        .into(),
669                ),
670            };
671            hummock_time_travel_version::Entity::insert(m)
672                .on_conflict_do_nothing()
673                .exec(txn)
674                .await?;
675            // Return early to skip persisting delta.
676            return Ok(version_sst_ids);
677        }
678        let written = write_sstable_infos(
679            delta.newly_added_sst_infos(true).filter(|s| {
680                !skip_sst_ids.contains(&s.sst_id)
681                    && s.table_ids
682                        .iter()
683                        .any(|tid| time_travel_table_ids.contains(tid))
684            }),
685            txn,
686            self.env.opts.hummock_time_travel_sst_info_insert_batch_size,
687        )
688        .await?;
689        let has_state_table_info_delta = delta
690            .state_table_info_delta
691            .keys()
692            .any(|table_id| time_travel_table_ids.contains(table_id));
693        if written > 0 || has_state_table_info_delta {
694            let m = hummock_time_travel_delta::ActiveModel {
695                version_id: Set(delta.id),
696                version_delta: Set((&IncompleteHummockVersionDelta::from((
697                    &delta,
698                    &time_travel_table_ids,
699                ))
700                .to_protobuf())
701                    .into()),
702            };
703            hummock_time_travel_delta::Entity::insert(m)
704                .on_conflict_do_nothing()
705                .exec(txn)
706                .await?;
707        }
708
709        Ok(version_sst_ids)
710    }
711}
712
713/// The `HummockVersion` is actually `InHummockVersion`. It requires `refill_version`.
714fn replay_archive(
715    version: PbHummockVersion,
716    deltas: impl Iterator<Item = PbHummockVersionDelta>,
717) -> HummockVersion {
718    // The pb version ann pb version delta are actually written by InHummockVersion and InHummockVersionDelta, respectively.
719    // Using HummockVersion make it easier for `refill_version` later.
720    let mut last_version = HummockVersion::from_persisted_protobuf_owned(version);
721    for d in deltas {
722        let d = HummockVersionDelta::from_persisted_protobuf_owned(d);
723        debug_assert!(
724            !should_mark_next_time_travel_version_snapshot(&d),
725            "unexpected time travel delta {:?}",
726            d
727        );
728        // Need to work around the assertion in `apply_version_delta`.
729        // Because compaction deltas are not included in time travel archive.
730        while last_version.id < d.prev_id {
731            last_version.id += 1;
732        }
733        last_version.apply_version_delta(&d);
734    }
735    last_version
736}
737
738pub fn require_sql_meta_store_err() -> Error {
739    Error::TimeTravel(anyhow!("require SQL meta store"))
740}
741
742/// Time travel delta replay only expect `NewL0SubLevel`. In all other cases, a new version snapshot should be created.
743pub fn should_mark_next_time_travel_version_snapshot(delta: &HummockVersionDelta) -> bool {
744    delta.group_deltas.iter().any(|(_, deltas)| {
745        deltas
746            .group_deltas
747            .iter()
748            .any(|d| !matches!(d, GroupDeltaCommon::NewL0SubLevel(_)))
749    })
750}