Skip to main content

risingwave_meta/hummock/manager/
time_travel.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet, VecDeque};
16
17use anyhow::anyhow;
18use futures::TryStreamExt;
19use risingwave_common::catalog::TableId;
20use risingwave_common::system_param::reader::SystemParamsRead;
21use risingwave_common::util::epoch::Epoch;
22use risingwave_hummock_sdk::compaction_group::StateTableId;
23use risingwave_hummock_sdk::sstable_info::SstableInfo;
24use risingwave_hummock_sdk::time_travel::{
25    IncompleteHummockVersion, IncompleteHummockVersionDelta, refill_version,
26};
27use risingwave_hummock_sdk::version::{GroupDeltaCommon, HummockVersion, HummockVersionDelta};
28use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, HummockObjectId, HummockSstableId};
29use risingwave_meta_model::hummock_sstable_info::SstableInfoV2Backend;
30use risingwave_meta_model::{
31    HummockVersionId, hummock_epoch_to_version, hummock_sstable_info, hummock_time_travel_delta,
32    hummock_time_travel_version,
33};
34use risingwave_pb::hummock::{PbHummockVersion, PbHummockVersionDelta};
35use sea_orm::ActiveValue::Set;
36use sea_orm::{
37    ColumnTrait, Condition, DatabaseTransaction, EntityTrait, PaginatorTrait, QueryFilter,
38    QueryOrder, QuerySelect, TransactionTrait,
39};
40use tracing::info;
41
42use crate::hummock::HummockManager;
43use crate::hummock::error::{Error, Result};
44
45/// Time travel.
46impl HummockManager {
47    pub(crate) async fn init_time_travel_state(&self) -> Result<()> {
48        let sql_store = self.env.meta_store_ref();
49        let mut guard = self.versioning.write().await;
50        guard.mark_next_time_travel_version_snapshot();
51
52        guard.last_time_travel_snapshot_sst_ids = HashSet::new();
53        let Some(version) = hummock_time_travel_version::Entity::find()
54            .order_by_desc(hummock_time_travel_version::Column::VersionId)
55            .one(&sql_store.conn)
56            .await?
57            .map(|v| {
58                IncompleteHummockVersion::from_persisted_protobuf_owned(v.version.to_protobuf())
59            })
60        else {
61            return Ok(());
62        };
63        guard.last_time_travel_snapshot_sst_ids = version.get_sst_ids();
64        Ok(())
65    }
66
67    pub(crate) async fn truncate_time_travel_metadata(
68        &self,
69        epoch_watermark: HummockEpoch,
70    ) -> Result<()> {
71        let _timer = self
72            .metrics
73            .time_travel_vacuum_metadata_latency
74            .start_timer();
75        let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id();
76        let sql_store = self.env.meta_store_ref();
77        let txn = sql_store.conn.begin().await?;
78        let version_watermark = hummock_epoch_to_version::Entity::find()
79            .filter(
80                hummock_epoch_to_version::Column::Epoch
81                    .lt(risingwave_meta_model::Epoch::try_from(epoch_watermark).unwrap()),
82            )
83            .order_by_desc(hummock_epoch_to_version::Column::Epoch)
84            .order_by_asc(hummock_epoch_to_version::Column::VersionId)
85            .one(&txn)
86            .await?;
87        let Some(version_watermark) = version_watermark else {
88            txn.commit().await?;
89            return Ok(());
90        };
91        // metadata BELOW watermark_version_id will be truncated.
92        let mut watermark_version_id =
93            std::cmp::min(version_watermark.version_id, min_pinned_version_id);
94        if let Some(max_version_count) = self.env.opts.time_travel_vacuum_max_version_count {
95            let earliest2_version_ids = hummock_time_travel_version::Entity::find()
96                .select_only()
97                .column(hummock_time_travel_version::Column::VersionId)
98                .order_by_asc(hummock_time_travel_version::Column::VersionId)
99                .limit(2)
100                .into_tuple::<HummockVersionId>()
101                .all(&txn)
102                .await?;
103            // Ensure at least 1 version BELOW watermark_version_id if applying time_travel_vacuum_max_version_count.
104            if earliest2_version_ids.len() == 2 {
105                watermark_version_id = std::cmp::min(
106                    watermark_version_id,
107                    HummockVersionId::new(std::cmp::max(
108                        earliest2_version_ids[0]
109                            .as_raw_id()
110                            .saturating_add(max_version_count.into()),
111                        earliest2_version_ids[1].as_raw_id(),
112                    )),
113                );
114            }
115        }
116        let res = hummock_epoch_to_version::Entity::delete_many()
117            .filter(
118                hummock_epoch_to_version::Column::Epoch
119                    .lt(risingwave_meta_model::Epoch::try_from(epoch_watermark).unwrap()),
120            )
121            .exec(&txn)
122            .await?;
123        tracing::info!(
124            epoch_watermark,
125            "Delete {} rows from hummock_epoch_to_version.",
126            res.rows_affected
127        );
128        let latest_valid_version = hummock_time_travel_version::Entity::find()
129            .filter(hummock_time_travel_version::Column::VersionId.lte(watermark_version_id))
130            .order_by_desc(hummock_time_travel_version::Column::VersionId)
131            .one(&txn)
132            .await?
133            .map(|m| {
134                IncompleteHummockVersion::from_persisted_protobuf_owned(m.version.to_protobuf())
135            });
136        let Some(latest_valid_version) = latest_valid_version else {
137            txn.commit().await?;
138            return Ok(());
139        };
140        let (
141            latest_valid_version_id,
142            latest_valid_version_sst_ids,
143            latest_valid_version_object_ids,
144        ) = {
145            (
146                latest_valid_version.id,
147                latest_valid_version.get_sst_ids(),
148                latest_valid_version
149                    .get_object_ids()
150                    .collect::<HashSet<_>>(),
151            )
152        };
153        let mut object_ids_to_delete: HashSet<_> = HashSet::default();
154        let version_ids_to_delete: Vec<risingwave_meta_model::HummockVersionId> =
155            hummock_time_travel_version::Entity::find()
156                .select_only()
157                .column(hummock_time_travel_version::Column::VersionId)
158                .filter(hummock_time_travel_version::Column::VersionId.lt(latest_valid_version_id))
159                .order_by_desc(hummock_time_travel_version::Column::VersionId)
160                .into_tuple()
161                .all(&txn)
162                .await?;
163        let delta_ids_to_delete: Vec<risingwave_meta_model::HummockVersionId> =
164            hummock_time_travel_delta::Entity::find()
165                .select_only()
166                .column(hummock_time_travel_delta::Column::VersionId)
167                .filter(hummock_time_travel_delta::Column::VersionId.lt(latest_valid_version_id))
168                .into_tuple()
169                .all(&txn)
170                .await?;
171        let delete_sst_batch_size = self
172            .env
173            .opts
174            .hummock_time_travel_epoch_version_insert_batch_size;
175        let delta_fetch_batch_size = self
176            .env
177            .opts
178            .hummock_time_travel_delta_fetch_batch_size
179            .max(1);
180        let mut sst_ids_to_delete: HashSet<_> = HashSet::default();
181        async fn delete_sst_in_batch(
182            txn: &DatabaseTransaction,
183            sst_ids_to_delete: HashSet<HummockSstableId>,
184            delete_sst_batch_size: usize,
185        ) -> Result<()> {
186            for start_idx in 0..=(sst_ids_to_delete.len().saturating_sub(1) / delete_sst_batch_size)
187            {
188                hummock_sstable_info::Entity::delete_many()
189                    .filter(
190                        hummock_sstable_info::Column::SstId.is_in(
191                            sst_ids_to_delete
192                                .iter()
193                                .skip(start_idx * delete_sst_batch_size)
194                                .take(delete_sst_batch_size)
195                                .copied(),
196                        ),
197                    )
198                    .exec(txn)
199                    .await?;
200            }
201            Ok(())
202        }
203        for delta_id_batch in delta_ids_to_delete.chunks(delta_fetch_batch_size) {
204            let mut delta_to_delete_by_id: HashMap<_, _> =
205                hummock_time_travel_delta::Entity::find()
206                    .filter(
207                        hummock_time_travel_delta::Column::VersionId
208                            .is_in(delta_id_batch.iter().copied()),
209                    )
210                    .all(&txn)
211                    .await?
212                    .into_iter()
213                    .map(|delta| (delta.version_id, delta))
214                    .collect();
215            for &delta_id_to_delete in delta_id_batch {
216                let delta_to_delete = delta_to_delete_by_id
217                    .remove(&delta_id_to_delete)
218                    .ok_or_else(|| {
219                        Error::TimeTravel(anyhow!(format!(
220                            "version delta {} not found",
221                            delta_id_to_delete
222                        )))
223                    })?;
224                let delta_to_delete = IncompleteHummockVersionDelta::from_persisted_protobuf_owned(
225                    delta_to_delete.version_delta.to_protobuf(),
226                );
227                let new_sst_ids = delta_to_delete.newly_added_sst_ids(true);
228                // The SST ids added and then deleted by compaction between the 2 versions.
229                sst_ids_to_delete.extend(&new_sst_ids - &latest_valid_version_sst_ids);
230                if sst_ids_to_delete.len() >= delete_sst_batch_size {
231                    delete_sst_in_batch(
232                        &txn,
233                        std::mem::take(&mut sst_ids_to_delete),
234                        delete_sst_batch_size,
235                    )
236                    .await?;
237                }
238                let new_object_ids = delta_to_delete.newly_added_object_ids(true);
239                object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
240            }
241        }
242        let mut next_version_sst_ids = latest_valid_version_sst_ids;
243        for prev_version_id in version_ids_to_delete {
244            let prev_version = {
245                let prev_version = hummock_time_travel_version::Entity::find_by_id(prev_version_id)
246                    .one(&txn)
247                    .await?
248                    .ok_or_else(|| {
249                        Error::TimeTravel(anyhow!(format!(
250                            "prev_version {} not found",
251                            prev_version_id
252                        )))
253                    })?;
254                IncompleteHummockVersion::from_persisted_protobuf_owned(
255                    prev_version.version.to_protobuf(),
256                )
257            };
258            let sst_ids = prev_version.get_sst_ids();
259            // The SST ids deleted by compaction between the 2 versions.
260            sst_ids_to_delete.extend(&sst_ids - &next_version_sst_ids);
261            if sst_ids_to_delete.len() >= delete_sst_batch_size {
262                delete_sst_in_batch(
263                    &txn,
264                    std::mem::take(&mut sst_ids_to_delete),
265                    delete_sst_batch_size,
266                )
267                .await?;
268            }
269            let new_object_ids: HashSet<_> = prev_version.get_object_ids().collect();
270            object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
271            next_version_sst_ids = sst_ids;
272        }
273        if !sst_ids_to_delete.is_empty() {
274            delete_sst_in_batch(&txn, sst_ids_to_delete, delete_sst_batch_size).await?;
275        }
276
277        if !object_ids_to_delete.is_empty() {
278            // IMPORTANT: object_ids_to_delete may include objects that are still being used by SSTs not included in time travel metadata.
279            // So it's crucial to filter out those objects before actually deleting them, i.e. when using `try_take_may_delete_object_ids`.
280            self.gc_manager
281                .add_may_delete_object_ids(object_ids_to_delete.into_iter());
282        }
283
284        let res = hummock_time_travel_version::Entity::delete_many()
285            .filter(hummock_time_travel_version::Column::VersionId.lt(latest_valid_version_id))
286            .exec(&txn)
287            .await?;
288        tracing::info!(
289            %watermark_version_id,
290            %latest_valid_version_id,
291            "Deleted {} rows from hummock_time_travel_version.",
292            res.rows_affected
293        );
294
295        let res = hummock_time_travel_delta::Entity::delete_many()
296            .filter(hummock_time_travel_delta::Column::VersionId.lt(latest_valid_version_id))
297            .exec(&txn)
298            .await?;
299        tracing::info!(
300            %watermark_version_id,
301            %latest_valid_version_id,
302            "Deleted {} rows from hummock_time_travel_delta.",
303            res.rows_affected
304        );
305
306        txn.commit().await?;
307        Ok(())
308    }
309
310    pub(crate) async fn filter_out_objects_by_time_travel_v1(
311        &self,
312        objects: impl Iterator<Item = HummockObjectId>,
313    ) -> Result<HashSet<HummockObjectId>> {
314        let batch_size = self
315            .env
316            .opts
317            .hummock_time_travel_filter_out_objects_batch_size;
318        info!("filter out objects by time travel v1, only sst will remain in the result set");
319        // The input object count is much smaller than time travel pinned object count in meta store.
320        // So search input object in meta store.
321        let mut result: HashSet<_> = objects
322            .filter(|object_id| match object_id {
323                HummockObjectId::Sstable(_) => true,
324                HummockObjectId::VectorFile(_) | HummockObjectId::HnswGraphFile(_) => false,
325            })
326            .collect();
327        let mut remain_sst: VecDeque<_> = result.iter().copied().collect();
328        while !remain_sst.is_empty() {
329            let batch = remain_sst
330                .drain(..std::cmp::min(remain_sst.len(), batch_size))
331                .map(|object_id| object_id.as_raw());
332            let reject_object_ids: Vec<risingwave_meta_model::HummockSstableObjectId> =
333                hummock_sstable_info::Entity::find()
334                    .filter(hummock_sstable_info::Column::ObjectId.is_in(batch))
335                    .select_only()
336                    .column(hummock_sstable_info::Column::ObjectId)
337                    .into_tuple()
338                    .all(&self.env.meta_store_ref().conn)
339                    .await?;
340            for reject in reject_object_ids {
341                let object_id = HummockObjectId::Sstable(reject);
342                result.remove(&object_id);
343            }
344        }
345        Ok(result)
346    }
347
348    pub(crate) async fn filter_out_objects_by_time_travel(
349        &self,
350        objects: impl Iterator<Item = HummockObjectId>,
351    ) -> Result<HashSet<HummockObjectId>> {
352        if self.env.opts.hummock_time_travel_filter_out_objects_v1 {
353            return self.filter_out_objects_by_time_travel_v1(objects).await;
354        }
355        let mut result: HashSet<_> = objects.collect();
356
357        // filtered out object id pinned by time travel hummock version
358        {
359            let mut prev_version_id: Option<HummockVersionId> = None;
360            loop {
361                let query = hummock_time_travel_version::Entity::find();
362                let query = if let Some(prev_version_id) = prev_version_id {
363                    query.filter(hummock_time_travel_version::Column::VersionId.gt(prev_version_id))
364                } else {
365                    query
366                };
367                let mut version_stream = query
368                    .order_by_asc(hummock_time_travel_version::Column::VersionId)
369                    .limit(
370                        self.env
371                            .opts
372                            .hummock_time_travel_filter_out_objects_list_version_batch_size
373                            as u64,
374                    )
375                    .stream(&self.env.meta_store_ref().conn)
376                    .await?;
377                let mut next_prev_version_id = None;
378                while let Some(model) = version_stream.try_next().await? {
379                    let version =
380                        HummockVersion::from_persisted_protobuf_owned(model.version.to_protobuf());
381                    for object_id in version.get_object_ids() {
382                        result.remove(&object_id);
383                    }
384                    next_prev_version_id = Some(model.version_id);
385                }
386                if let Some(next_prev_version_id) = next_prev_version_id {
387                    prev_version_id = Some(next_prev_version_id);
388                } else {
389                    break;
390                }
391            }
392        }
393
394        // filtered out object ids pinned by time travel hummock version delta
395        {
396            let mut prev_version_id: Option<HummockVersionId> = None;
397            loop {
398                let query = hummock_time_travel_delta::Entity::find();
399                let query = if let Some(prev_version_id) = prev_version_id {
400                    query.filter(hummock_time_travel_delta::Column::VersionId.gt(prev_version_id))
401                } else {
402                    query
403                };
404                let mut version_stream = query
405                    .order_by_asc(hummock_time_travel_delta::Column::VersionId)
406                    .limit(
407                        self.env
408                            .opts
409                            .hummock_time_travel_filter_out_objects_list_delta_batch_size
410                            as u64,
411                    )
412                    .stream(&self.env.meta_store_ref().conn)
413                    .await?;
414                let mut next_prev_version_id = None;
415                while let Some(model) = version_stream.try_next().await? {
416                    let version_delta = HummockVersionDelta::from_persisted_protobuf_owned(
417                        model.version_delta.to_protobuf(),
418                    );
419                    // set exclude_table_change_log to true because in time travel delta we ignore the table change log
420                    for object_id in version_delta.newly_added_object_ids(true) {
421                        result.remove(&object_id);
422                    }
423                    next_prev_version_id = Some(model.version_id);
424                }
425                if let Some(next_prev_version_id) = next_prev_version_id {
426                    prev_version_id = Some(next_prev_version_id);
427                } else {
428                    break;
429                }
430            }
431        }
432
433        Ok(result)
434    }
435
436    pub(crate) async fn time_travel_pinned_object_count(&self) -> Result<u64> {
437        let count = hummock_sstable_info::Entity::find()
438            .count(&self.env.meta_store_ref().conn)
439            .await?;
440        Ok(count)
441    }
442
443    /// Attempt to locate the version corresponding to `query_epoch`.
444    ///
445    /// The version is retrieved from `hummock_epoch_to_version`, selecting the entry with the largest epoch that's lte `query_epoch`.
446    ///
447    /// The resulted version is complete, i.e. with correct `SstableInfo`.
448    pub async fn epoch_to_version(
449        &self,
450        query_epoch: HummockEpoch,
451        table_id: TableId,
452    ) -> Result<HummockVersion> {
453        let sql_store = self.env.meta_store_ref();
454        let _permit = self.inflight_time_travel_query.try_acquire().map_err(|_| {
455            anyhow!(format!(
456                "too many inflight time travel queries, max_inflight_time_travel_query={}",
457                self.env.opts.max_inflight_time_travel_query
458            ))
459        })?;
460        let epoch_to_version = hummock_epoch_to_version::Entity::find()
461            .filter(
462                Condition::any()
463                    .add(
464                        hummock_epoch_to_version::Column::TableId
465                            .eq(i64::from(table_id.as_raw_id())),
466                    )
467                    // for backward compatibility
468                    .add(hummock_epoch_to_version::Column::TableId.eq(0)),
469            )
470            .filter(
471                hummock_epoch_to_version::Column::Epoch
472                    .lte(risingwave_meta_model::Epoch::try_from(query_epoch).unwrap()),
473            )
474            .order_by_desc(hummock_epoch_to_version::Column::Epoch)
475            .one(&sql_store.conn)
476            .await?
477            .ok_or_else(|| Error::TimeTravelVersionExpired {
478                table_id,
479                epoch: query_epoch,
480            })?;
481        let timer = self
482            .metrics
483            .time_travel_version_replay_latency
484            .start_timer();
485        let actual_version_id = epoch_to_version.version_id;
486        tracing::debug!(
487            query_epoch,
488            query_tz = ?(Epoch(query_epoch).as_timestamptz()),
489            actual_epoch = epoch_to_version.epoch,
490            actual_tz = ?(Epoch(u64::try_from(epoch_to_version.epoch).unwrap()).as_timestamptz()),
491            %actual_version_id,
492            "convert query epoch"
493        );
494
495        let replay_version = hummock_time_travel_version::Entity::find()
496            .filter(hummock_time_travel_version::Column::VersionId.lte(actual_version_id))
497            .order_by_desc(hummock_time_travel_version::Column::VersionId)
498            .one(&sql_store.conn)
499            .await?
500            .ok_or_else(|| Error::TimeTravelVersionExpired {
501                table_id,
502                epoch: query_epoch,
503            })?;
504        let deltas = hummock_time_travel_delta::Entity::find()
505            .filter(hummock_time_travel_delta::Column::VersionId.gt(replay_version.version_id))
506            .filter(hummock_time_travel_delta::Column::VersionId.lte(actual_version_id))
507            .order_by_asc(hummock_time_travel_delta::Column::VersionId)
508            .all(&sql_store.conn)
509            .await?;
510        // SstableInfo in actual_version is incomplete before refill_version.
511        let mut actual_version = replay_archive(
512            replay_version.version.to_protobuf(),
513            deltas.into_iter().map(|d| d.version_delta.to_protobuf()),
514        );
515
516        let mut sst_ids = actual_version
517            .get_sst_ids()
518            .into_iter()
519            .collect::<VecDeque<_>>();
520        let sst_count = sst_ids.len();
521        let mut sst_id_to_info = HashMap::with_capacity(sst_count);
522        let sst_info_fetch_batch_size = self.env.opts.hummock_time_travel_sst_info_fetch_batch_size;
523        while !sst_ids.is_empty() {
524            let sst_infos = hummock_sstable_info::Entity::find()
525                .filter(hummock_sstable_info::Column::SstId.is_in(
526                    sst_ids.drain(..std::cmp::min(sst_info_fetch_batch_size, sst_ids.len())),
527                ))
528                .all(&sql_store.conn)
529                .await?;
530            for sst_info in sst_infos {
531                let sst_info: SstableInfo = sst_info.sstable_info.to_protobuf().into();
532                sst_id_to_info.insert(sst_info.sst_id, sst_info);
533            }
534        }
535        if sst_count != sst_id_to_info.len() {
536            return Err(Error::TimeTravelVersionExpired {
537                table_id,
538                epoch: query_epoch,
539            });
540        }
541        refill_version(&mut actual_version, &sst_id_to_info, table_id);
542        timer.observe_duration();
543        Ok(actual_version)
544    }
545
546    pub(crate) async fn write_time_travel_metadata(
547        &self,
548        txn: &DatabaseTransaction,
549        version: Option<&HummockVersion>,
550        delta: HummockVersionDelta,
551        time_travel_table_ids: HashSet<StateTableId>,
552        skip_sst_ids: &HashSet<HummockSstableId>,
553        tables_to_commit: impl Iterator<Item = (&TableId, &CompactionGroupId, u64)>,
554    ) -> Result<Option<HashSet<HummockSstableId>>> {
555        let _timer = self
556            .metrics
557            .time_travel_write_metadata_latency
558            .start_timer();
559        if self
560            .env
561            .system_params_reader()
562            .await
563            .time_travel_retention_ms()
564            == 0
565        {
566            return Ok(None);
567        }
568        async fn write_sstable_infos(
569            mut sst_infos: impl Iterator<Item = &SstableInfo>,
570            txn: &DatabaseTransaction,
571            batch_size: usize,
572        ) -> Result<usize> {
573            let mut count = 0;
574            let mut is_finished = false;
575            while !is_finished {
576                let mut remain = batch_size;
577                let mut batch = vec![];
578                while remain > 0 {
579                    let Some(sst_info) = sst_infos.next() else {
580                        is_finished = true;
581                        break;
582                    };
583                    batch.push(hummock_sstable_info::ActiveModel {
584                        sst_id: Set(sst_info.sst_id),
585                        object_id: Set(sst_info.object_id),
586                        sstable_info: Set(SstableInfoV2Backend::from(&sst_info.to_protobuf())),
587                    });
588                    remain -= 1;
589                    count += 1;
590                }
591                if batch.is_empty() {
592                    break;
593                }
594                hummock_sstable_info::Entity::insert_many(batch)
595                    .on_conflict_do_nothing()
596                    .exec(txn)
597                    .await?;
598            }
599            Ok(count)
600        }
601
602        let mut batch = vec![];
603        for (table_id, _cg_id, committed_epoch) in tables_to_commit {
604            let m = hummock_epoch_to_version::ActiveModel {
605                epoch: Set(committed_epoch.try_into().unwrap()),
606                table_id: Set(i64::from(table_id.as_raw_id())),
607                version_id: Set(delta.id),
608            };
609            batch.push(m);
610            if batch.len()
611                >= self
612                    .env
613                    .opts
614                    .hummock_time_travel_epoch_version_insert_batch_size
615            {
616                // There should be no conflict rows.
617                hummock_epoch_to_version::Entity::insert_many(std::mem::take(&mut batch))
618                    .do_nothing()
619                    .exec(txn)
620                    .await?;
621            }
622        }
623        if !batch.is_empty() {
624            // There should be no conflict rows.
625            hummock_epoch_to_version::Entity::insert_many(batch)
626                .do_nothing()
627                .exec(txn)
628                .await?;
629        }
630
631        let mut version_sst_ids = None;
632        if let Some(version) = version {
633            // `version_sst_ids` is used to update `last_time_travel_snapshot_sst_ids`.
634            version_sst_ids = Some(
635                version
636                    .get_sst_infos()
637                    .filter_map(|s| {
638                        if s.table_ids
639                            .iter()
640                            .any(|tid| time_travel_table_ids.contains(tid))
641                        {
642                            return Some(s.sst_id);
643                        }
644                        None
645                    })
646                    .collect(),
647            );
648            write_sstable_infos(
649                version.get_sst_infos().filter(|s| {
650                    !skip_sst_ids.contains(&s.sst_id)
651                        && s.table_ids
652                            .iter()
653                            .any(|tid| time_travel_table_ids.contains(tid))
654                }),
655                txn,
656                self.env.opts.hummock_time_travel_sst_info_insert_batch_size,
657            )
658            .await?;
659            let m = hummock_time_travel_version::ActiveModel {
660                version_id: Set(version.id),
661                version: Set(
662                    (&IncompleteHummockVersion::from((version, &time_travel_table_ids))
663                        .to_protobuf())
664                        .into(),
665                ),
666            };
667            hummock_time_travel_version::Entity::insert(m)
668                .on_conflict_do_nothing()
669                .exec(txn)
670                .await?;
671            // Return early to skip persisting delta.
672            return Ok(version_sst_ids);
673        }
674        let written = write_sstable_infos(
675            delta.newly_added_sst_infos(true).filter(|s| {
676                !skip_sst_ids.contains(&s.sst_id)
677                    && s.table_ids
678                        .iter()
679                        .any(|tid| time_travel_table_ids.contains(tid))
680            }),
681            txn,
682            self.env.opts.hummock_time_travel_sst_info_insert_batch_size,
683        )
684        .await?;
685        let has_state_table_info_delta = delta
686            .state_table_info_delta
687            .keys()
688            .any(|table_id| time_travel_table_ids.contains(table_id));
689        if written > 0 || has_state_table_info_delta {
690            let m = hummock_time_travel_delta::ActiveModel {
691                version_id: Set(delta.id),
692                version_delta: Set((&IncompleteHummockVersionDelta::from((
693                    &delta,
694                    &time_travel_table_ids,
695                ))
696                .to_protobuf())
697                    .into()),
698            };
699            hummock_time_travel_delta::Entity::insert(m)
700                .on_conflict_do_nothing()
701                .exec(txn)
702                .await?;
703        }
704
705        Ok(version_sst_ids)
706    }
707}
708
709/// The `HummockVersion` is actually `InHummockVersion`. It requires `refill_version`.
710fn replay_archive(
711    version: PbHummockVersion,
712    deltas: impl Iterator<Item = PbHummockVersionDelta>,
713) -> HummockVersion {
714    // The pb version ann pb version delta are actually written by InHummockVersion and InHummockVersionDelta, respectively.
715    // Using HummockVersion make it easier for `refill_version` later.
716    let mut last_version = HummockVersion::from_persisted_protobuf_owned(version);
717    for d in deltas {
718        let d = HummockVersionDelta::from_persisted_protobuf_owned(d);
719        debug_assert!(
720            !should_mark_next_time_travel_version_snapshot(&d),
721            "unexpected time travel delta {:?}",
722            d
723        );
724        // Need to work around the assertion in `apply_version_delta`.
725        // Because compaction deltas are not included in time travel archive.
726        while last_version.id < d.prev_id {
727            last_version.id += 1;
728        }
729        last_version.apply_version_delta(&d);
730    }
731    last_version
732}
733
734pub fn require_sql_meta_store_err() -> Error {
735    Error::TimeTravel(anyhow!("require SQL meta store"))
736}
737
738/// Time travel delta replay only expect `NewL0SubLevel`. In all other cases, a new version snapshot should be created.
739pub fn should_mark_next_time_travel_version_snapshot(delta: &HummockVersionDelta) -> bool {
740    delta.group_deltas.iter().any(|(_, deltas)| {
741        deltas
742            .group_deltas
743            .iter()
744            .any(|d| !matches!(d, GroupDeltaCommon::NewL0SubLevel(_)))
745    })
746}