risingwave_meta/hummock/manager/
time_travel.rs1use std::collections::{HashMap, HashSet, VecDeque};
16
17use anyhow::anyhow;
18use futures::TryStreamExt;
19use risingwave_common::catalog::TableId;
20use risingwave_common::system_param::reader::SystemParamsRead;
21use risingwave_common::util::epoch::Epoch;
22use risingwave_hummock_sdk::compaction_group::StateTableId;
23use risingwave_hummock_sdk::sstable_info::SstableInfo;
24use risingwave_hummock_sdk::time_travel::{
25 IncompleteHummockVersion, IncompleteHummockVersionDelta, refill_version,
26};
27use risingwave_hummock_sdk::version::{GroupDeltaCommon, HummockVersion, HummockVersionDelta};
28use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, HummockObjectId, HummockSstableId};
29use risingwave_meta_model::hummock_sstable_info::SstableInfoV2Backend;
30use risingwave_meta_model::{
31 HummockVersionId, hummock_epoch_to_version, hummock_sstable_info, hummock_time_travel_delta,
32 hummock_time_travel_version,
33};
34use risingwave_pb::hummock::{PbHummockVersion, PbHummockVersionDelta};
35use sea_orm::ActiveValue::Set;
36use sea_orm::{
37 ColumnTrait, Condition, DatabaseTransaction, EntityTrait, PaginatorTrait, QueryFilter,
38 QueryOrder, QuerySelect, TransactionTrait,
39};
40use tracing::info;
41
42use crate::hummock::HummockManager;
43use crate::hummock::error::{Error, Result};
44
45impl HummockManager {
47 pub(crate) async fn init_time_travel_state(&self) -> Result<()> {
48 let sql_store = self.env.meta_store_ref();
49 let mut guard = self.versioning.write().await;
50 guard.mark_next_time_travel_version_snapshot();
51
52 guard.last_time_travel_snapshot_sst_ids = HashSet::new();
53 let Some(version) = hummock_time_travel_version::Entity::find()
54 .order_by_desc(hummock_time_travel_version::Column::VersionId)
55 .one(&sql_store.conn)
56 .await?
57 .map(|v| {
58 IncompleteHummockVersion::from_persisted_protobuf_owned(v.version.to_protobuf())
59 })
60 else {
61 return Ok(());
62 };
63 guard.last_time_travel_snapshot_sst_ids = version.get_sst_ids();
64 Ok(())
65 }
66
67 pub(crate) async fn truncate_time_travel_metadata(
68 &self,
69 epoch_watermark: HummockEpoch,
70 ) -> Result<()> {
71 let _timer = self
72 .metrics
73 .time_travel_vacuum_metadata_latency
74 .start_timer();
75 let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id();
76 let sql_store = self.env.meta_store_ref();
77 let txn = sql_store.conn.begin().await?;
78 let version_watermark = hummock_epoch_to_version::Entity::find()
79 .filter(
80 hummock_epoch_to_version::Column::Epoch
81 .lt(risingwave_meta_model::Epoch::try_from(epoch_watermark).unwrap()),
82 )
83 .order_by_desc(hummock_epoch_to_version::Column::Epoch)
84 .order_by_asc(hummock_epoch_to_version::Column::VersionId)
85 .one(&txn)
86 .await?;
87 let Some(version_watermark) = version_watermark else {
88 txn.commit().await?;
89 return Ok(());
90 };
91 let mut watermark_version_id =
93 std::cmp::min(version_watermark.version_id, min_pinned_version_id);
94 if let Some(max_version_count) = self.env.opts.time_travel_vacuum_max_version_count {
95 let earliest2_version_ids = hummock_time_travel_version::Entity::find()
96 .select_only()
97 .column(hummock_time_travel_version::Column::VersionId)
98 .order_by_asc(hummock_time_travel_version::Column::VersionId)
99 .limit(2)
100 .into_tuple::<HummockVersionId>()
101 .all(&txn)
102 .await?;
103 if earliest2_version_ids.len() == 2 {
105 watermark_version_id = std::cmp::min(
106 watermark_version_id,
107 HummockVersionId::new(std::cmp::max(
108 earliest2_version_ids[0]
109 .as_raw_id()
110 .saturating_add(max_version_count.into()),
111 earliest2_version_ids[1].as_raw_id(),
112 )),
113 );
114 }
115 }
116 let res = hummock_epoch_to_version::Entity::delete_many()
117 .filter(
118 hummock_epoch_to_version::Column::Epoch
119 .lt(risingwave_meta_model::Epoch::try_from(epoch_watermark).unwrap()),
120 )
121 .exec(&txn)
122 .await?;
123 tracing::info!(
124 epoch_watermark,
125 "Delete {} rows from hummock_epoch_to_version.",
126 res.rows_affected
127 );
128 let latest_valid_version = hummock_time_travel_version::Entity::find()
129 .filter(hummock_time_travel_version::Column::VersionId.lte(watermark_version_id))
130 .order_by_desc(hummock_time_travel_version::Column::VersionId)
131 .one(&txn)
132 .await?
133 .map(|m| {
134 IncompleteHummockVersion::from_persisted_protobuf_owned(m.version.to_protobuf())
135 });
136 let Some(latest_valid_version) = latest_valid_version else {
137 txn.commit().await?;
138 return Ok(());
139 };
140 let (
141 latest_valid_version_id,
142 latest_valid_version_sst_ids,
143 latest_valid_version_object_ids,
144 ) = {
145 (
146 latest_valid_version.id,
147 latest_valid_version.get_sst_ids(),
148 latest_valid_version
149 .get_object_ids()
150 .collect::<HashSet<_>>(),
151 )
152 };
153 let mut object_ids_to_delete: HashSet<_> = HashSet::default();
154 let version_ids_to_delete: Vec<risingwave_meta_model::HummockVersionId> =
155 hummock_time_travel_version::Entity::find()
156 .select_only()
157 .column(hummock_time_travel_version::Column::VersionId)
158 .filter(hummock_time_travel_version::Column::VersionId.lt(latest_valid_version_id))
159 .order_by_desc(hummock_time_travel_version::Column::VersionId)
160 .into_tuple()
161 .all(&txn)
162 .await?;
163 let delta_ids_to_delete: Vec<risingwave_meta_model::HummockVersionId> =
164 hummock_time_travel_delta::Entity::find()
165 .select_only()
166 .column(hummock_time_travel_delta::Column::VersionId)
167 .filter(hummock_time_travel_delta::Column::VersionId.lt(latest_valid_version_id))
168 .into_tuple()
169 .all(&txn)
170 .await?;
171 let delete_sst_batch_size = self
172 .env
173 .opts
174 .hummock_time_travel_epoch_version_insert_batch_size;
175 let delta_fetch_batch_size = self
176 .env
177 .opts
178 .hummock_time_travel_delta_fetch_batch_size
179 .max(1);
180 let mut sst_ids_to_delete: HashSet<_> = HashSet::default();
181 async fn delete_sst_in_batch(
182 txn: &DatabaseTransaction,
183 sst_ids_to_delete: HashSet<HummockSstableId>,
184 delete_sst_batch_size: usize,
185 ) -> Result<()> {
186 for start_idx in 0..=(sst_ids_to_delete.len().saturating_sub(1) / delete_sst_batch_size)
187 {
188 hummock_sstable_info::Entity::delete_many()
189 .filter(
190 hummock_sstable_info::Column::SstId.is_in(
191 sst_ids_to_delete
192 .iter()
193 .skip(start_idx * delete_sst_batch_size)
194 .take(delete_sst_batch_size)
195 .copied(),
196 ),
197 )
198 .exec(txn)
199 .await?;
200 }
201 Ok(())
202 }
203 for delta_id_batch in delta_ids_to_delete.chunks(delta_fetch_batch_size) {
204 let mut delta_to_delete_by_id: HashMap<_, _> =
205 hummock_time_travel_delta::Entity::find()
206 .filter(
207 hummock_time_travel_delta::Column::VersionId
208 .is_in(delta_id_batch.iter().copied()),
209 )
210 .all(&txn)
211 .await?
212 .into_iter()
213 .map(|delta| (delta.version_id, delta))
214 .collect();
215 for &delta_id_to_delete in delta_id_batch {
216 let delta_to_delete = delta_to_delete_by_id
217 .remove(&delta_id_to_delete)
218 .ok_or_else(|| {
219 Error::TimeTravel(anyhow!(format!(
220 "version delta {} not found",
221 delta_id_to_delete
222 )))
223 })?;
224 let delta_to_delete = IncompleteHummockVersionDelta::from_persisted_protobuf_owned(
225 delta_to_delete.version_delta.to_protobuf(),
226 );
227 let new_sst_ids = delta_to_delete.newly_added_sst_ids(true);
228 sst_ids_to_delete.extend(&new_sst_ids - &latest_valid_version_sst_ids);
230 if sst_ids_to_delete.len() >= delete_sst_batch_size {
231 delete_sst_in_batch(
232 &txn,
233 std::mem::take(&mut sst_ids_to_delete),
234 delete_sst_batch_size,
235 )
236 .await?;
237 }
238 let new_object_ids = delta_to_delete.newly_added_object_ids(true);
239 object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
240 }
241 }
242 let mut next_version_sst_ids = latest_valid_version_sst_ids;
243 for prev_version_id in version_ids_to_delete {
244 let prev_version = {
245 let prev_version = hummock_time_travel_version::Entity::find_by_id(prev_version_id)
246 .one(&txn)
247 .await?
248 .ok_or_else(|| {
249 Error::TimeTravel(anyhow!(format!(
250 "prev_version {} not found",
251 prev_version_id
252 )))
253 })?;
254 IncompleteHummockVersion::from_persisted_protobuf_owned(
255 prev_version.version.to_protobuf(),
256 )
257 };
258 let sst_ids = prev_version.get_sst_ids();
259 sst_ids_to_delete.extend(&sst_ids - &next_version_sst_ids);
261 if sst_ids_to_delete.len() >= delete_sst_batch_size {
262 delete_sst_in_batch(
263 &txn,
264 std::mem::take(&mut sst_ids_to_delete),
265 delete_sst_batch_size,
266 )
267 .await?;
268 }
269 let new_object_ids: HashSet<_> = prev_version.get_object_ids().collect();
270 object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
271 next_version_sst_ids = sst_ids;
272 }
273 if !sst_ids_to_delete.is_empty() {
274 delete_sst_in_batch(&txn, sst_ids_to_delete, delete_sst_batch_size).await?;
275 }
276
277 if !object_ids_to_delete.is_empty() {
278 self.gc_manager
281 .add_may_delete_object_ids(object_ids_to_delete.into_iter());
282 }
283
284 let res = hummock_time_travel_version::Entity::delete_many()
285 .filter(hummock_time_travel_version::Column::VersionId.lt(latest_valid_version_id))
286 .exec(&txn)
287 .await?;
288 tracing::info!(
289 %watermark_version_id,
290 %latest_valid_version_id,
291 "Deleted {} rows from hummock_time_travel_version.",
292 res.rows_affected
293 );
294
295 let res = hummock_time_travel_delta::Entity::delete_many()
296 .filter(hummock_time_travel_delta::Column::VersionId.lt(latest_valid_version_id))
297 .exec(&txn)
298 .await?;
299 tracing::info!(
300 %watermark_version_id,
301 %latest_valid_version_id,
302 "Deleted {} rows from hummock_time_travel_delta.",
303 res.rows_affected
304 );
305
306 txn.commit().await?;
307 Ok(())
308 }
309
310 pub(crate) async fn filter_out_objects_by_time_travel_v1(
311 &self,
312 objects: impl Iterator<Item = HummockObjectId>,
313 ) -> Result<HashSet<HummockObjectId>> {
314 let batch_size = self
315 .env
316 .opts
317 .hummock_time_travel_filter_out_objects_batch_size;
318 info!("filter out objects by time travel v1, only sst will remain in the result set");
319 let mut result: HashSet<_> = objects
322 .filter(|object_id| match object_id {
323 HummockObjectId::Sstable(_) => true,
324 HummockObjectId::VectorFile(_) | HummockObjectId::HnswGraphFile(_) => false,
325 })
326 .collect();
327 let mut remain_sst: VecDeque<_> = result.iter().copied().collect();
328 while !remain_sst.is_empty() {
329 let batch = remain_sst
330 .drain(..std::cmp::min(remain_sst.len(), batch_size))
331 .map(|object_id| object_id.as_raw());
332 let reject_object_ids: Vec<risingwave_meta_model::HummockSstableObjectId> =
333 hummock_sstable_info::Entity::find()
334 .filter(hummock_sstable_info::Column::ObjectId.is_in(batch))
335 .select_only()
336 .column(hummock_sstable_info::Column::ObjectId)
337 .into_tuple()
338 .all(&self.env.meta_store_ref().conn)
339 .await?;
340 for reject in reject_object_ids {
341 let object_id = HummockObjectId::Sstable(reject);
342 result.remove(&object_id);
343 }
344 }
345 Ok(result)
346 }
347
348 pub(crate) async fn filter_out_objects_by_time_travel(
349 &self,
350 objects: impl Iterator<Item = HummockObjectId>,
351 ) -> Result<HashSet<HummockObjectId>> {
352 if self.env.opts.hummock_time_travel_filter_out_objects_v1 {
353 return self.filter_out_objects_by_time_travel_v1(objects).await;
354 }
355 let mut result: HashSet<_> = objects.collect();
356
357 {
359 let mut prev_version_id: Option<HummockVersionId> = None;
360 loop {
361 let query = hummock_time_travel_version::Entity::find();
362 let query = if let Some(prev_version_id) = prev_version_id {
363 query.filter(hummock_time_travel_version::Column::VersionId.gt(prev_version_id))
364 } else {
365 query
366 };
367 let mut version_stream = query
368 .order_by_asc(hummock_time_travel_version::Column::VersionId)
369 .limit(
370 self.env
371 .opts
372 .hummock_time_travel_filter_out_objects_list_version_batch_size
373 as u64,
374 )
375 .stream(&self.env.meta_store_ref().conn)
376 .await?;
377 let mut next_prev_version_id = None;
378 while let Some(model) = version_stream.try_next().await? {
379 let version =
380 HummockVersion::from_persisted_protobuf_owned(model.version.to_protobuf());
381 for object_id in version.get_object_ids() {
382 result.remove(&object_id);
383 }
384 next_prev_version_id = Some(model.version_id);
385 }
386 if let Some(next_prev_version_id) = next_prev_version_id {
387 prev_version_id = Some(next_prev_version_id);
388 } else {
389 break;
390 }
391 }
392 }
393
394 {
396 let mut prev_version_id: Option<HummockVersionId> = None;
397 loop {
398 let query = hummock_time_travel_delta::Entity::find();
399 let query = if let Some(prev_version_id) = prev_version_id {
400 query.filter(hummock_time_travel_delta::Column::VersionId.gt(prev_version_id))
401 } else {
402 query
403 };
404 let mut version_stream = query
405 .order_by_asc(hummock_time_travel_delta::Column::VersionId)
406 .limit(
407 self.env
408 .opts
409 .hummock_time_travel_filter_out_objects_list_delta_batch_size
410 as u64,
411 )
412 .stream(&self.env.meta_store_ref().conn)
413 .await?;
414 let mut next_prev_version_id = None;
415 while let Some(model) = version_stream.try_next().await? {
416 let version_delta = HummockVersionDelta::from_persisted_protobuf_owned(
417 model.version_delta.to_protobuf(),
418 );
419 for object_id in version_delta.newly_added_object_ids(true) {
421 result.remove(&object_id);
422 }
423 next_prev_version_id = Some(model.version_id);
424 }
425 if let Some(next_prev_version_id) = next_prev_version_id {
426 prev_version_id = Some(next_prev_version_id);
427 } else {
428 break;
429 }
430 }
431 }
432
433 Ok(result)
434 }
435
436 pub(crate) async fn time_travel_pinned_object_count(&self) -> Result<u64> {
437 let count = hummock_sstable_info::Entity::find()
438 .count(&self.env.meta_store_ref().conn)
439 .await?;
440 Ok(count)
441 }
442
443 pub async fn epoch_to_version(
449 &self,
450 query_epoch: HummockEpoch,
451 table_id: TableId,
452 ) -> Result<HummockVersion> {
453 let sql_store = self.env.meta_store_ref();
454 let _permit = self.inflight_time_travel_query.try_acquire().map_err(|_| {
455 anyhow!(format!(
456 "too many inflight time travel queries, max_inflight_time_travel_query={}",
457 self.env.opts.max_inflight_time_travel_query
458 ))
459 })?;
460 let epoch_to_version = hummock_epoch_to_version::Entity::find()
461 .filter(
462 Condition::any()
463 .add(
464 hummock_epoch_to_version::Column::TableId
465 .eq(i64::from(table_id.as_raw_id())),
466 )
467 .add(hummock_epoch_to_version::Column::TableId.eq(0)),
469 )
470 .filter(
471 hummock_epoch_to_version::Column::Epoch
472 .lte(risingwave_meta_model::Epoch::try_from(query_epoch).unwrap()),
473 )
474 .order_by_desc(hummock_epoch_to_version::Column::Epoch)
475 .one(&sql_store.conn)
476 .await?
477 .ok_or_else(|| {
478 Error::TimeTravel(anyhow!(format!(
479 "version not found for epoch {}",
480 query_epoch
481 )))
482 })?;
483 let timer = self
484 .metrics
485 .time_travel_version_replay_latency
486 .start_timer();
487 let actual_version_id = epoch_to_version.version_id;
488 tracing::debug!(
489 query_epoch,
490 query_tz = ?(Epoch(query_epoch).as_timestamptz()),
491 actual_epoch = epoch_to_version.epoch,
492 actual_tz = ?(Epoch(u64::try_from(epoch_to_version.epoch).unwrap()).as_timestamptz()),
493 %actual_version_id,
494 "convert query epoch"
495 );
496
497 let replay_version = hummock_time_travel_version::Entity::find()
498 .filter(hummock_time_travel_version::Column::VersionId.lte(actual_version_id))
499 .order_by_desc(hummock_time_travel_version::Column::VersionId)
500 .one(&sql_store.conn)
501 .await?
502 .ok_or_else(|| {
503 Error::TimeTravel(anyhow!(format!(
504 "no replay version found for epoch {}, version {}",
505 query_epoch, actual_version_id,
506 )))
507 })?;
508 let deltas = hummock_time_travel_delta::Entity::find()
509 .filter(hummock_time_travel_delta::Column::VersionId.gt(replay_version.version_id))
510 .filter(hummock_time_travel_delta::Column::VersionId.lte(actual_version_id))
511 .order_by_asc(hummock_time_travel_delta::Column::VersionId)
512 .all(&sql_store.conn)
513 .await?;
514 let mut actual_version = replay_archive(
516 replay_version.version.to_protobuf(),
517 deltas.into_iter().map(|d| d.version_delta.to_protobuf()),
518 );
519
520 let mut sst_ids = actual_version
521 .get_sst_ids()
522 .into_iter()
523 .collect::<VecDeque<_>>();
524 let sst_count = sst_ids.len();
525 let mut sst_id_to_info = HashMap::with_capacity(sst_count);
526 let sst_info_fetch_batch_size = self.env.opts.hummock_time_travel_sst_info_fetch_batch_size;
527 while !sst_ids.is_empty() {
528 let sst_infos = hummock_sstable_info::Entity::find()
529 .filter(hummock_sstable_info::Column::SstId.is_in(
530 sst_ids.drain(..std::cmp::min(sst_info_fetch_batch_size, sst_ids.len())),
531 ))
532 .all(&sql_store.conn)
533 .await?;
534 for sst_info in sst_infos {
535 let sst_info: SstableInfo = sst_info.sstable_info.to_protobuf().into();
536 sst_id_to_info.insert(sst_info.sst_id, sst_info);
537 }
538 }
539 if sst_count != sst_id_to_info.len() {
540 return Err(Error::TimeTravel(anyhow!(format!(
541 "some SstableInfos not found for epoch {}, version {}",
542 query_epoch, actual_version_id,
543 ))));
544 }
545 refill_version(&mut actual_version, &sst_id_to_info, table_id);
546 timer.observe_duration();
547 Ok(actual_version)
548 }
549
550 pub(crate) async fn write_time_travel_metadata(
551 &self,
552 txn: &DatabaseTransaction,
553 version: Option<&HummockVersion>,
554 delta: HummockVersionDelta,
555 time_travel_table_ids: HashSet<StateTableId>,
556 skip_sst_ids: &HashSet<HummockSstableId>,
557 tables_to_commit: impl Iterator<Item = (&TableId, &CompactionGroupId, u64)>,
558 ) -> Result<Option<HashSet<HummockSstableId>>> {
559 let _timer = self
560 .metrics
561 .time_travel_write_metadata_latency
562 .start_timer();
563 if self
564 .env
565 .system_params_reader()
566 .await
567 .time_travel_retention_ms()
568 == 0
569 {
570 return Ok(None);
571 }
572 async fn write_sstable_infos(
573 mut sst_infos: impl Iterator<Item = &SstableInfo>,
574 txn: &DatabaseTransaction,
575 batch_size: usize,
576 ) -> Result<usize> {
577 let mut count = 0;
578 let mut is_finished = false;
579 while !is_finished {
580 let mut remain = batch_size;
581 let mut batch = vec![];
582 while remain > 0 {
583 let Some(sst_info) = sst_infos.next() else {
584 is_finished = true;
585 break;
586 };
587 batch.push(hummock_sstable_info::ActiveModel {
588 sst_id: Set(sst_info.sst_id),
589 object_id: Set(sst_info.object_id),
590 sstable_info: Set(SstableInfoV2Backend::from(&sst_info.to_protobuf())),
591 });
592 remain -= 1;
593 count += 1;
594 }
595 if batch.is_empty() {
596 break;
597 }
598 hummock_sstable_info::Entity::insert_many(batch)
599 .on_conflict_do_nothing()
600 .exec(txn)
601 .await?;
602 }
603 Ok(count)
604 }
605
606 let mut batch = vec![];
607 for (table_id, _cg_id, committed_epoch) in tables_to_commit {
608 let m = hummock_epoch_to_version::ActiveModel {
609 epoch: Set(committed_epoch.try_into().unwrap()),
610 table_id: Set(i64::from(table_id.as_raw_id())),
611 version_id: Set(delta.id),
612 };
613 batch.push(m);
614 if batch.len()
615 >= self
616 .env
617 .opts
618 .hummock_time_travel_epoch_version_insert_batch_size
619 {
620 hummock_epoch_to_version::Entity::insert_many(std::mem::take(&mut batch))
622 .do_nothing()
623 .exec(txn)
624 .await?;
625 }
626 }
627 if !batch.is_empty() {
628 hummock_epoch_to_version::Entity::insert_many(batch)
630 .do_nothing()
631 .exec(txn)
632 .await?;
633 }
634
635 let mut version_sst_ids = None;
636 if let Some(version) = version {
637 version_sst_ids = Some(
639 version
640 .get_sst_infos()
641 .filter_map(|s| {
642 if s.table_ids
643 .iter()
644 .any(|tid| time_travel_table_ids.contains(tid))
645 {
646 return Some(s.sst_id);
647 }
648 None
649 })
650 .collect(),
651 );
652 write_sstable_infos(
653 version.get_sst_infos().filter(|s| {
654 !skip_sst_ids.contains(&s.sst_id)
655 && s.table_ids
656 .iter()
657 .any(|tid| time_travel_table_ids.contains(tid))
658 }),
659 txn,
660 self.env.opts.hummock_time_travel_sst_info_insert_batch_size,
661 )
662 .await?;
663 let m = hummock_time_travel_version::ActiveModel {
664 version_id: Set(version.id),
665 version: Set(
666 (&IncompleteHummockVersion::from((version, &time_travel_table_ids))
667 .to_protobuf())
668 .into(),
669 ),
670 };
671 hummock_time_travel_version::Entity::insert(m)
672 .on_conflict_do_nothing()
673 .exec(txn)
674 .await?;
675 return Ok(version_sst_ids);
677 }
678 let written = write_sstable_infos(
679 delta.newly_added_sst_infos(true).filter(|s| {
680 !skip_sst_ids.contains(&s.sst_id)
681 && s.table_ids
682 .iter()
683 .any(|tid| time_travel_table_ids.contains(tid))
684 }),
685 txn,
686 self.env.opts.hummock_time_travel_sst_info_insert_batch_size,
687 )
688 .await?;
689 let has_state_table_info_delta = delta
690 .state_table_info_delta
691 .keys()
692 .any(|table_id| time_travel_table_ids.contains(table_id));
693 if written > 0 || has_state_table_info_delta {
694 let m = hummock_time_travel_delta::ActiveModel {
695 version_id: Set(delta.id),
696 version_delta: Set((&IncompleteHummockVersionDelta::from((
697 &delta,
698 &time_travel_table_ids,
699 ))
700 .to_protobuf())
701 .into()),
702 };
703 hummock_time_travel_delta::Entity::insert(m)
704 .on_conflict_do_nothing()
705 .exec(txn)
706 .await?;
707 }
708
709 Ok(version_sst_ids)
710 }
711}
712
713fn replay_archive(
715 version: PbHummockVersion,
716 deltas: impl Iterator<Item = PbHummockVersionDelta>,
717) -> HummockVersion {
718 let mut last_version = HummockVersion::from_persisted_protobuf_owned(version);
721 for d in deltas {
722 let d = HummockVersionDelta::from_persisted_protobuf_owned(d);
723 debug_assert!(
724 !should_mark_next_time_travel_version_snapshot(&d),
725 "unexpected time travel delta {:?}",
726 d
727 );
728 while last_version.id < d.prev_id {
731 last_version.id += 1;
732 }
733 last_version.apply_version_delta(&d);
734 }
735 last_version
736}
737
738pub fn require_sql_meta_store_err() -> Error {
739 Error::TimeTravel(anyhow!("require SQL meta store"))
740}
741
742pub fn should_mark_next_time_travel_version_snapshot(delta: &HummockVersionDelta) -> bool {
744 delta.group_deltas.iter().any(|(_, deltas)| {
745 deltas
746 .group_deltas
747 .iter()
748 .any(|d| !matches!(d, GroupDeltaCommon::NewL0SubLevel(_)))
749 })
750}