risingwave_meta/hummock/manager/
time_travel.rs1use std::collections::{HashMap, HashSet, VecDeque};
16
17use anyhow::anyhow;
18use futures::TryStreamExt;
19use risingwave_common::catalog::TableId;
20use risingwave_common::system_param::reader::SystemParamsRead;
21use risingwave_common::util::epoch::Epoch;
22use risingwave_hummock_sdk::compaction_group::StateTableId;
23use risingwave_hummock_sdk::sstable_info::SstableInfo;
24use risingwave_hummock_sdk::time_travel::{
25 IncompleteHummockVersion, IncompleteHummockVersionDelta, refill_version,
26};
27use risingwave_hummock_sdk::version::{GroupDeltaCommon, HummockVersion, HummockVersionDelta};
28use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, HummockObjectId, HummockSstableId};
29use risingwave_meta_model::hummock_sstable_info::SstableInfoV2Backend;
30use risingwave_meta_model::{
31 HummockVersionId, hummock_epoch_to_version, hummock_sstable_info, hummock_time_travel_delta,
32 hummock_time_travel_version,
33};
34use risingwave_pb::hummock::{PbHummockVersion, PbHummockVersionDelta};
35use sea_orm::ActiveValue::Set;
36use sea_orm::{
37 ColumnTrait, Condition, DatabaseTransaction, EntityTrait, PaginatorTrait, QueryFilter,
38 QueryOrder, QuerySelect, TransactionTrait,
39};
40use tracing::info;
41
42use crate::hummock::HummockManager;
43use crate::hummock::error::{Error, Result};
44
45impl HummockManager {
47 pub(crate) async fn init_time_travel_state(&self) -> Result<()> {
48 let sql_store = self.env.meta_store_ref();
49 let mut guard = self.versioning.write().await;
50 guard.mark_next_time_travel_version_snapshot();
51
52 guard.last_time_travel_snapshot_sst_ids = HashSet::new();
53 let Some(version) = hummock_time_travel_version::Entity::find()
54 .order_by_desc(hummock_time_travel_version::Column::VersionId)
55 .one(&sql_store.conn)
56 .await?
57 .map(|v| {
58 IncompleteHummockVersion::from_persisted_protobuf_owned(v.version.to_protobuf())
59 })
60 else {
61 return Ok(());
62 };
63 guard.last_time_travel_snapshot_sst_ids = version.get_sst_ids();
64 Ok(())
65 }
66
67 pub(crate) async fn truncate_time_travel_metadata(
68 &self,
69 epoch_watermark: HummockEpoch,
70 ) -> Result<()> {
71 let _timer = self
72 .metrics
73 .time_travel_vacuum_metadata_latency
74 .start_timer();
75 let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id();
76 let sql_store = self.env.meta_store_ref();
77 let txn = sql_store.conn.begin().await?;
78 let version_watermark = hummock_epoch_to_version::Entity::find()
79 .filter(
80 hummock_epoch_to_version::Column::Epoch
81 .lt(risingwave_meta_model::Epoch::try_from(epoch_watermark).unwrap()),
82 )
83 .order_by_desc(hummock_epoch_to_version::Column::Epoch)
84 .order_by_asc(hummock_epoch_to_version::Column::VersionId)
85 .one(&txn)
86 .await?;
87 let Some(version_watermark) = version_watermark else {
88 txn.commit().await?;
89 return Ok(());
90 };
91 let mut watermark_version_id =
93 std::cmp::min(version_watermark.version_id, min_pinned_version_id);
94 if let Some(max_version_count) = self.env.opts.time_travel_vacuum_max_version_count {
95 let earliest2_version_ids = hummock_time_travel_version::Entity::find()
96 .select_only()
97 .column(hummock_time_travel_version::Column::VersionId)
98 .order_by_asc(hummock_time_travel_version::Column::VersionId)
99 .limit(2)
100 .into_tuple::<HummockVersionId>()
101 .all(&txn)
102 .await?;
103 if earliest2_version_ids.len() == 2 {
105 watermark_version_id = std::cmp::min(
106 watermark_version_id,
107 HummockVersionId::new(std::cmp::max(
108 earliest2_version_ids[0]
109 .as_raw_id()
110 .saturating_add(max_version_count.into()),
111 earliest2_version_ids[1].as_raw_id(),
112 )),
113 );
114 }
115 }
116 let res = hummock_epoch_to_version::Entity::delete_many()
117 .filter(
118 hummock_epoch_to_version::Column::Epoch
119 .lt(risingwave_meta_model::Epoch::try_from(epoch_watermark).unwrap()),
120 )
121 .exec(&txn)
122 .await?;
123 tracing::info!(
124 epoch_watermark,
125 "Delete {} rows from hummock_epoch_to_version.",
126 res.rows_affected
127 );
128 let latest_valid_version = hummock_time_travel_version::Entity::find()
129 .filter(hummock_time_travel_version::Column::VersionId.lte(watermark_version_id))
130 .order_by_desc(hummock_time_travel_version::Column::VersionId)
131 .one(&txn)
132 .await?
133 .map(|m| {
134 IncompleteHummockVersion::from_persisted_protobuf_owned(m.version.to_protobuf())
135 });
136 let Some(latest_valid_version) = latest_valid_version else {
137 txn.commit().await?;
138 return Ok(());
139 };
140 let (
141 latest_valid_version_id,
142 latest_valid_version_sst_ids,
143 latest_valid_version_object_ids,
144 ) = {
145 (
146 latest_valid_version.id,
147 latest_valid_version.get_sst_ids(),
148 latest_valid_version
149 .get_object_ids()
150 .collect::<HashSet<_>>(),
151 )
152 };
153 let mut object_ids_to_delete: HashSet<_> = HashSet::default();
154 let version_ids_to_delete: Vec<risingwave_meta_model::HummockVersionId> =
155 hummock_time_travel_version::Entity::find()
156 .select_only()
157 .column(hummock_time_travel_version::Column::VersionId)
158 .filter(hummock_time_travel_version::Column::VersionId.lt(latest_valid_version_id))
159 .order_by_desc(hummock_time_travel_version::Column::VersionId)
160 .into_tuple()
161 .all(&txn)
162 .await?;
163 let delta_ids_to_delete: Vec<risingwave_meta_model::HummockVersionId> =
164 hummock_time_travel_delta::Entity::find()
165 .select_only()
166 .column(hummock_time_travel_delta::Column::VersionId)
167 .filter(hummock_time_travel_delta::Column::VersionId.lt(latest_valid_version_id))
168 .into_tuple()
169 .all(&txn)
170 .await?;
171 let delete_sst_batch_size = self
172 .env
173 .opts
174 .hummock_time_travel_epoch_version_insert_batch_size;
175 let delta_fetch_batch_size = self
176 .env
177 .opts
178 .hummock_time_travel_delta_fetch_batch_size
179 .max(1);
180 let mut sst_ids_to_delete: HashSet<_> = HashSet::default();
181 async fn delete_sst_in_batch(
182 txn: &DatabaseTransaction,
183 sst_ids_to_delete: HashSet<HummockSstableId>,
184 delete_sst_batch_size: usize,
185 ) -> Result<()> {
186 for start_idx in 0..=(sst_ids_to_delete.len().saturating_sub(1) / delete_sst_batch_size)
187 {
188 hummock_sstable_info::Entity::delete_many()
189 .filter(
190 hummock_sstable_info::Column::SstId.is_in(
191 sst_ids_to_delete
192 .iter()
193 .skip(start_idx * delete_sst_batch_size)
194 .take(delete_sst_batch_size)
195 .copied(),
196 ),
197 )
198 .exec(txn)
199 .await?;
200 }
201 Ok(())
202 }
203 for delta_id_batch in delta_ids_to_delete.chunks(delta_fetch_batch_size) {
204 let mut delta_to_delete_by_id: HashMap<_, _> =
205 hummock_time_travel_delta::Entity::find()
206 .filter(
207 hummock_time_travel_delta::Column::VersionId
208 .is_in(delta_id_batch.iter().copied()),
209 )
210 .all(&txn)
211 .await?
212 .into_iter()
213 .map(|delta| (delta.version_id, delta))
214 .collect();
215 for &delta_id_to_delete in delta_id_batch {
216 let delta_to_delete = delta_to_delete_by_id
217 .remove(&delta_id_to_delete)
218 .ok_or_else(|| {
219 Error::TimeTravel(anyhow!(format!(
220 "version delta {} not found",
221 delta_id_to_delete
222 )))
223 })?;
224 let delta_to_delete = IncompleteHummockVersionDelta::from_persisted_protobuf_owned(
225 delta_to_delete.version_delta.to_protobuf(),
226 );
227 let new_sst_ids = delta_to_delete.newly_added_sst_ids(true);
228 sst_ids_to_delete.extend(&new_sst_ids - &latest_valid_version_sst_ids);
230 if sst_ids_to_delete.len() >= delete_sst_batch_size {
231 delete_sst_in_batch(
232 &txn,
233 std::mem::take(&mut sst_ids_to_delete),
234 delete_sst_batch_size,
235 )
236 .await?;
237 }
238 let new_object_ids = delta_to_delete.newly_added_object_ids(true);
239 object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
240 }
241 }
242 let mut next_version_sst_ids = latest_valid_version_sst_ids;
243 for prev_version_id in version_ids_to_delete {
244 let prev_version = {
245 let prev_version = hummock_time_travel_version::Entity::find_by_id(prev_version_id)
246 .one(&txn)
247 .await?
248 .ok_or_else(|| {
249 Error::TimeTravel(anyhow!(format!(
250 "prev_version {} not found",
251 prev_version_id
252 )))
253 })?;
254 IncompleteHummockVersion::from_persisted_protobuf_owned(
255 prev_version.version.to_protobuf(),
256 )
257 };
258 let sst_ids = prev_version.get_sst_ids();
259 sst_ids_to_delete.extend(&sst_ids - &next_version_sst_ids);
261 if sst_ids_to_delete.len() >= delete_sst_batch_size {
262 delete_sst_in_batch(
263 &txn,
264 std::mem::take(&mut sst_ids_to_delete),
265 delete_sst_batch_size,
266 )
267 .await?;
268 }
269 let new_object_ids: HashSet<_> = prev_version.get_object_ids().collect();
270 object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
271 next_version_sst_ids = sst_ids;
272 }
273 if !sst_ids_to_delete.is_empty() {
274 delete_sst_in_batch(&txn, sst_ids_to_delete, delete_sst_batch_size).await?;
275 }
276
277 if !object_ids_to_delete.is_empty() {
278 self.gc_manager
281 .add_may_delete_object_ids(object_ids_to_delete.into_iter());
282 }
283
284 let res = hummock_time_travel_version::Entity::delete_many()
285 .filter(hummock_time_travel_version::Column::VersionId.lt(latest_valid_version_id))
286 .exec(&txn)
287 .await?;
288 tracing::info!(
289 %watermark_version_id,
290 %latest_valid_version_id,
291 "Deleted {} rows from hummock_time_travel_version.",
292 res.rows_affected
293 );
294
295 let res = hummock_time_travel_delta::Entity::delete_many()
296 .filter(hummock_time_travel_delta::Column::VersionId.lt(latest_valid_version_id))
297 .exec(&txn)
298 .await?;
299 tracing::info!(
300 %watermark_version_id,
301 %latest_valid_version_id,
302 "Deleted {} rows from hummock_time_travel_delta.",
303 res.rows_affected
304 );
305
306 txn.commit().await?;
307 Ok(())
308 }
309
310 pub(crate) async fn filter_out_objects_by_time_travel_v1(
311 &self,
312 objects: impl Iterator<Item = HummockObjectId>,
313 ) -> Result<HashSet<HummockObjectId>> {
314 let batch_size = self
315 .env
316 .opts
317 .hummock_time_travel_filter_out_objects_batch_size;
318 info!("filter out objects by time travel v1, only sst will remain in the result set");
319 let mut result: HashSet<_> = objects
322 .filter(|object_id| match object_id {
323 HummockObjectId::Sstable(_) => true,
324 HummockObjectId::VectorFile(_) | HummockObjectId::HnswGraphFile(_) => false,
325 })
326 .collect();
327 let mut remain_sst: VecDeque<_> = result.iter().copied().collect();
328 while !remain_sst.is_empty() {
329 let batch = remain_sst
330 .drain(..std::cmp::min(remain_sst.len(), batch_size))
331 .map(|object_id| object_id.as_raw());
332 let reject_object_ids: Vec<risingwave_meta_model::HummockSstableObjectId> =
333 hummock_sstable_info::Entity::find()
334 .filter(hummock_sstable_info::Column::ObjectId.is_in(batch))
335 .select_only()
336 .column(hummock_sstable_info::Column::ObjectId)
337 .into_tuple()
338 .all(&self.env.meta_store_ref().conn)
339 .await?;
340 for reject in reject_object_ids {
341 let object_id = HummockObjectId::Sstable(reject);
342 result.remove(&object_id);
343 }
344 }
345 Ok(result)
346 }
347
348 pub(crate) async fn filter_out_objects_by_time_travel(
349 &self,
350 objects: impl Iterator<Item = HummockObjectId>,
351 ) -> Result<HashSet<HummockObjectId>> {
352 if self.env.opts.hummock_time_travel_filter_out_objects_v1 {
353 return self.filter_out_objects_by_time_travel_v1(objects).await;
354 }
355 let mut result: HashSet<_> = objects.collect();
356
357 {
359 let mut prev_version_id: Option<HummockVersionId> = None;
360 loop {
361 let query = hummock_time_travel_version::Entity::find();
362 let query = if let Some(prev_version_id) = prev_version_id {
363 query.filter(hummock_time_travel_version::Column::VersionId.gt(prev_version_id))
364 } else {
365 query
366 };
367 let mut version_stream = query
368 .order_by_asc(hummock_time_travel_version::Column::VersionId)
369 .limit(
370 self.env
371 .opts
372 .hummock_time_travel_filter_out_objects_list_version_batch_size
373 as u64,
374 )
375 .stream(&self.env.meta_store_ref().conn)
376 .await?;
377 let mut next_prev_version_id = None;
378 while let Some(model) = version_stream.try_next().await? {
379 let version =
380 HummockVersion::from_persisted_protobuf_owned(model.version.to_protobuf());
381 for object_id in version.get_object_ids() {
382 result.remove(&object_id);
383 }
384 next_prev_version_id = Some(model.version_id);
385 }
386 if let Some(next_prev_version_id) = next_prev_version_id {
387 prev_version_id = Some(next_prev_version_id);
388 } else {
389 break;
390 }
391 }
392 }
393
394 {
396 let mut prev_version_id: Option<HummockVersionId> = None;
397 loop {
398 let query = hummock_time_travel_delta::Entity::find();
399 let query = if let Some(prev_version_id) = prev_version_id {
400 query.filter(hummock_time_travel_delta::Column::VersionId.gt(prev_version_id))
401 } else {
402 query
403 };
404 let mut version_stream = query
405 .order_by_asc(hummock_time_travel_delta::Column::VersionId)
406 .limit(
407 self.env
408 .opts
409 .hummock_time_travel_filter_out_objects_list_delta_batch_size
410 as u64,
411 )
412 .stream(&self.env.meta_store_ref().conn)
413 .await?;
414 let mut next_prev_version_id = None;
415 while let Some(model) = version_stream.try_next().await? {
416 let version_delta = HummockVersionDelta::from_persisted_protobuf_owned(
417 model.version_delta.to_protobuf(),
418 );
419 for object_id in version_delta.newly_added_object_ids(true) {
421 result.remove(&object_id);
422 }
423 next_prev_version_id = Some(model.version_id);
424 }
425 if let Some(next_prev_version_id) = next_prev_version_id {
426 prev_version_id = Some(next_prev_version_id);
427 } else {
428 break;
429 }
430 }
431 }
432
433 Ok(result)
434 }
435
436 pub(crate) async fn time_travel_pinned_object_count(&self) -> Result<u64> {
437 let count = hummock_sstable_info::Entity::find()
438 .count(&self.env.meta_store_ref().conn)
439 .await?;
440 Ok(count)
441 }
442
443 pub async fn epoch_to_version(
449 &self,
450 query_epoch: HummockEpoch,
451 table_id: TableId,
452 ) -> Result<HummockVersion> {
453 let sql_store = self.env.meta_store_ref();
454 let _permit = self.inflight_time_travel_query.try_acquire().map_err(|_| {
455 anyhow!(format!(
456 "too many inflight time travel queries, max_inflight_time_travel_query={}",
457 self.env.opts.max_inflight_time_travel_query
458 ))
459 })?;
460 let epoch_to_version = hummock_epoch_to_version::Entity::find()
461 .filter(
462 Condition::any()
463 .add(
464 hummock_epoch_to_version::Column::TableId
465 .eq(i64::from(table_id.as_raw_id())),
466 )
467 .add(hummock_epoch_to_version::Column::TableId.eq(0)),
469 )
470 .filter(
471 hummock_epoch_to_version::Column::Epoch
472 .lte(risingwave_meta_model::Epoch::try_from(query_epoch).unwrap()),
473 )
474 .order_by_desc(hummock_epoch_to_version::Column::Epoch)
475 .one(&sql_store.conn)
476 .await?
477 .ok_or_else(|| Error::TimeTravelVersionExpired {
478 table_id,
479 epoch: query_epoch,
480 })?;
481 let timer = self
482 .metrics
483 .time_travel_version_replay_latency
484 .start_timer();
485 let actual_version_id = epoch_to_version.version_id;
486 tracing::debug!(
487 query_epoch,
488 query_tz = ?(Epoch(query_epoch).as_timestamptz()),
489 actual_epoch = epoch_to_version.epoch,
490 actual_tz = ?(Epoch(u64::try_from(epoch_to_version.epoch).unwrap()).as_timestamptz()),
491 %actual_version_id,
492 "convert query epoch"
493 );
494
495 let replay_version = hummock_time_travel_version::Entity::find()
496 .filter(hummock_time_travel_version::Column::VersionId.lte(actual_version_id))
497 .order_by_desc(hummock_time_travel_version::Column::VersionId)
498 .one(&sql_store.conn)
499 .await?
500 .ok_or_else(|| Error::TimeTravelVersionExpired {
501 table_id,
502 epoch: query_epoch,
503 })?;
504 let deltas = hummock_time_travel_delta::Entity::find()
505 .filter(hummock_time_travel_delta::Column::VersionId.gt(replay_version.version_id))
506 .filter(hummock_time_travel_delta::Column::VersionId.lte(actual_version_id))
507 .order_by_asc(hummock_time_travel_delta::Column::VersionId)
508 .all(&sql_store.conn)
509 .await?;
510 let mut actual_version = replay_archive(
512 replay_version.version.to_protobuf(),
513 deltas.into_iter().map(|d| d.version_delta.to_protobuf()),
514 );
515
516 let mut sst_ids = actual_version
517 .get_sst_ids()
518 .into_iter()
519 .collect::<VecDeque<_>>();
520 let sst_count = sst_ids.len();
521 let mut sst_id_to_info = HashMap::with_capacity(sst_count);
522 let sst_info_fetch_batch_size = self.env.opts.hummock_time_travel_sst_info_fetch_batch_size;
523 while !sst_ids.is_empty() {
524 let sst_infos = hummock_sstable_info::Entity::find()
525 .filter(hummock_sstable_info::Column::SstId.is_in(
526 sst_ids.drain(..std::cmp::min(sst_info_fetch_batch_size, sst_ids.len())),
527 ))
528 .all(&sql_store.conn)
529 .await?;
530 for sst_info in sst_infos {
531 let sst_info: SstableInfo = sst_info.sstable_info.to_protobuf().into();
532 sst_id_to_info.insert(sst_info.sst_id, sst_info);
533 }
534 }
535 if sst_count != sst_id_to_info.len() {
536 return Err(Error::TimeTravelVersionExpired {
537 table_id,
538 epoch: query_epoch,
539 });
540 }
541 refill_version(&mut actual_version, &sst_id_to_info, table_id);
542 timer.observe_duration();
543 Ok(actual_version)
544 }
545
546 pub(crate) async fn write_time_travel_metadata(
547 &self,
548 txn: &DatabaseTransaction,
549 version: Option<&HummockVersion>,
550 delta: HummockVersionDelta,
551 time_travel_table_ids: HashSet<StateTableId>,
552 skip_sst_ids: &HashSet<HummockSstableId>,
553 tables_to_commit: impl Iterator<Item = (&TableId, &CompactionGroupId, u64)>,
554 ) -> Result<Option<HashSet<HummockSstableId>>> {
555 let _timer = self
556 .metrics
557 .time_travel_write_metadata_latency
558 .start_timer();
559 if self
560 .env
561 .system_params_reader()
562 .await
563 .time_travel_retention_ms()
564 == 0
565 {
566 return Ok(None);
567 }
568 async fn write_sstable_infos(
569 mut sst_infos: impl Iterator<Item = &SstableInfo>,
570 txn: &DatabaseTransaction,
571 batch_size: usize,
572 ) -> Result<usize> {
573 let mut count = 0;
574 let mut is_finished = false;
575 while !is_finished {
576 let mut remain = batch_size;
577 let mut batch = vec![];
578 while remain > 0 {
579 let Some(sst_info) = sst_infos.next() else {
580 is_finished = true;
581 break;
582 };
583 batch.push(hummock_sstable_info::ActiveModel {
584 sst_id: Set(sst_info.sst_id),
585 object_id: Set(sst_info.object_id),
586 sstable_info: Set(SstableInfoV2Backend::from(&sst_info.to_protobuf())),
587 });
588 remain -= 1;
589 count += 1;
590 }
591 if batch.is_empty() {
592 break;
593 }
594 hummock_sstable_info::Entity::insert_many(batch)
595 .on_conflict_do_nothing()
596 .exec(txn)
597 .await?;
598 }
599 Ok(count)
600 }
601
602 let mut batch = vec![];
603 for (table_id, _cg_id, committed_epoch) in tables_to_commit {
604 let m = hummock_epoch_to_version::ActiveModel {
605 epoch: Set(committed_epoch.try_into().unwrap()),
606 table_id: Set(i64::from(table_id.as_raw_id())),
607 version_id: Set(delta.id),
608 };
609 batch.push(m);
610 if batch.len()
611 >= self
612 .env
613 .opts
614 .hummock_time_travel_epoch_version_insert_batch_size
615 {
616 hummock_epoch_to_version::Entity::insert_many(std::mem::take(&mut batch))
618 .do_nothing()
619 .exec(txn)
620 .await?;
621 }
622 }
623 if !batch.is_empty() {
624 hummock_epoch_to_version::Entity::insert_many(batch)
626 .do_nothing()
627 .exec(txn)
628 .await?;
629 }
630
631 let mut version_sst_ids = None;
632 if let Some(version) = version {
633 version_sst_ids = Some(
635 version
636 .get_sst_infos()
637 .filter_map(|s| {
638 if s.table_ids
639 .iter()
640 .any(|tid| time_travel_table_ids.contains(tid))
641 {
642 return Some(s.sst_id);
643 }
644 None
645 })
646 .collect(),
647 );
648 write_sstable_infos(
649 version.get_sst_infos().filter(|s| {
650 !skip_sst_ids.contains(&s.sst_id)
651 && s.table_ids
652 .iter()
653 .any(|tid| time_travel_table_ids.contains(tid))
654 }),
655 txn,
656 self.env.opts.hummock_time_travel_sst_info_insert_batch_size,
657 )
658 .await?;
659 let m = hummock_time_travel_version::ActiveModel {
660 version_id: Set(version.id),
661 version: Set(
662 (&IncompleteHummockVersion::from((version, &time_travel_table_ids))
663 .to_protobuf())
664 .into(),
665 ),
666 };
667 hummock_time_travel_version::Entity::insert(m)
668 .on_conflict_do_nothing()
669 .exec(txn)
670 .await?;
671 return Ok(version_sst_ids);
673 }
674 let written = write_sstable_infos(
675 delta.newly_added_sst_infos(true).filter(|s| {
676 !skip_sst_ids.contains(&s.sst_id)
677 && s.table_ids
678 .iter()
679 .any(|tid| time_travel_table_ids.contains(tid))
680 }),
681 txn,
682 self.env.opts.hummock_time_travel_sst_info_insert_batch_size,
683 )
684 .await?;
685 let has_state_table_info_delta = delta
686 .state_table_info_delta
687 .keys()
688 .any(|table_id| time_travel_table_ids.contains(table_id));
689 if written > 0 || has_state_table_info_delta {
690 let m = hummock_time_travel_delta::ActiveModel {
691 version_id: Set(delta.id),
692 version_delta: Set((&IncompleteHummockVersionDelta::from((
693 &delta,
694 &time_travel_table_ids,
695 ))
696 .to_protobuf())
697 .into()),
698 };
699 hummock_time_travel_delta::Entity::insert(m)
700 .on_conflict_do_nothing()
701 .exec(txn)
702 .await?;
703 }
704
705 Ok(version_sst_ids)
706 }
707}
708
709fn replay_archive(
711 version: PbHummockVersion,
712 deltas: impl Iterator<Item = PbHummockVersionDelta>,
713) -> HummockVersion {
714 let mut last_version = HummockVersion::from_persisted_protobuf_owned(version);
717 for d in deltas {
718 let d = HummockVersionDelta::from_persisted_protobuf_owned(d);
719 debug_assert!(
720 !should_mark_next_time_travel_version_snapshot(&d),
721 "unexpected time travel delta {:?}",
722 d
723 );
724 while last_version.id < d.prev_id {
727 last_version.id += 1;
728 }
729 last_version.apply_version_delta(&d);
730 }
731 last_version
732}
733
734pub fn require_sql_meta_store_err() -> Error {
735 Error::TimeTravel(anyhow!("require SQL meta store"))
736}
737
738pub fn should_mark_next_time_travel_version_snapshot(delta: &HummockVersionDelta) -> bool {
740 delta.group_deltas.iter().any(|(_, deltas)| {
741 deltas
742 .group_deltas
743 .iter()
744 .any(|d| !matches!(d, GroupDeltaCommon::NewL0SubLevel(_)))
745 })
746}