risingwave_meta/hummock/manager/
time_travel.rs1use std::collections::{HashMap, HashSet, VecDeque};
16
17use anyhow::anyhow;
18use futures::TryStreamExt;
19use risingwave_common::catalog::TableId;
20use risingwave_common::system_param::reader::SystemParamsRead;
21use risingwave_common::util::epoch::Epoch;
22use risingwave_hummock_sdk::compaction_group::StateTableId;
23use risingwave_hummock_sdk::sstable_info::SstableInfo;
24use risingwave_hummock_sdk::time_travel::{
25 IncompleteHummockVersion, IncompleteHummockVersionDelta, refill_version,
26};
27use risingwave_hummock_sdk::version::{GroupDeltaCommon, HummockVersion, HummockVersionDelta};
28use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, HummockObjectId, HummockSstableId};
29use risingwave_meta_model::hummock_sstable_info::SstableInfoV2Backend;
30use risingwave_meta_model::{
31 HummockVersionId, hummock_epoch_to_version, hummock_sstable_info, hummock_time_travel_delta,
32 hummock_time_travel_version,
33};
34use risingwave_pb::hummock::{PbHummockVersion, PbHummockVersionDelta};
35use sea_orm::ActiveValue::Set;
36use sea_orm::{
37 ColumnTrait, Condition, DatabaseTransaction, EntityTrait, PaginatorTrait, QueryFilter,
38 QueryOrder, QuerySelect, TransactionTrait,
39};
40use tracing::info;
41
42use crate::hummock::HummockManager;
43use crate::hummock::error::{Error, Result};
44
45impl HummockManager {
47 pub(crate) async fn init_time_travel_state(&self) -> Result<()> {
48 let sql_store = self.env.meta_store_ref();
49 let mut guard = self.versioning.write().await;
50 guard.mark_next_time_travel_version_snapshot();
51
52 guard.last_time_travel_snapshot_sst_ids = HashSet::new();
53 let Some(version) = hummock_time_travel_version::Entity::find()
54 .order_by_desc(hummock_time_travel_version::Column::VersionId)
55 .one(&sql_store.conn)
56 .await?
57 .map(|v| IncompleteHummockVersion::from_persisted_protobuf(&v.version.to_protobuf()))
58 else {
59 return Ok(());
60 };
61 guard.last_time_travel_snapshot_sst_ids = version.get_sst_ids(true);
62 Ok(())
63 }
64
65 pub(crate) async fn truncate_time_travel_metadata(
66 &self,
67 epoch_watermark: HummockEpoch,
68 ) -> Result<()> {
69 let _timer = self
70 .metrics
71 .time_travel_vacuum_metadata_latency
72 .start_timer();
73 let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id();
74 let sql_store = self.env.meta_store_ref();
75 let txn = sql_store.conn.begin().await?;
76 let version_watermark = hummock_epoch_to_version::Entity::find()
77 .filter(
78 hummock_epoch_to_version::Column::Epoch
79 .lt(risingwave_meta_model::Epoch::try_from(epoch_watermark).unwrap()),
80 )
81 .order_by_desc(hummock_epoch_to_version::Column::Epoch)
82 .order_by_asc(hummock_epoch_to_version::Column::VersionId)
83 .one(&txn)
84 .await?;
85 let Some(version_watermark) = version_watermark else {
86 txn.commit().await?;
87 return Ok(());
88 };
89 let mut watermark_version_id =
90 std::cmp::min(version_watermark.version_id, min_pinned_version_id);
91 if let Some(max_version_count) = self.env.opts.time_travel_vacuum_max_version_count
92 && let Some(earliest_version_id) = hummock_time_travel_version::Entity::find()
93 .select_only()
94 .column(hummock_time_travel_version::Column::VersionId)
95 .order_by_asc(hummock_time_travel_version::Column::VersionId)
96 .into_tuple::<HummockVersionId>()
97 .one(&txn)
98 .await?
99 {
100 watermark_version_id = std::cmp::min(
101 watermark_version_id,
102 HummockVersionId::new(
103 earliest_version_id
104 .as_raw_id()
105 .saturating_add(max_version_count.into()),
106 ),
107 );
108 }
109 let res = hummock_epoch_to_version::Entity::delete_many()
110 .filter(
111 hummock_epoch_to_version::Column::Epoch
112 .lt(risingwave_meta_model::Epoch::try_from(epoch_watermark).unwrap()),
113 )
114 .exec(&txn)
115 .await?;
116 tracing::info!(
117 epoch_watermark,
118 "Delete {} rows from hummock_epoch_to_version.",
119 res.rows_affected
120 );
121 let latest_valid_version = hummock_time_travel_version::Entity::find()
122 .filter(hummock_time_travel_version::Column::VersionId.lte(watermark_version_id))
123 .order_by_desc(hummock_time_travel_version::Column::VersionId)
124 .one(&txn)
125 .await?
126 .map(|m| IncompleteHummockVersion::from_persisted_protobuf(&m.version.to_protobuf()));
127 let Some(latest_valid_version) = latest_valid_version else {
128 txn.commit().await?;
129 return Ok(());
130 };
131 let (
132 latest_valid_version_id,
133 latest_valid_version_sst_ids,
134 latest_valid_version_object_ids,
135 ) = {
136 (
137 latest_valid_version.id,
138 latest_valid_version.get_sst_ids(true),
139 latest_valid_version
140 .get_object_ids(true)
141 .collect::<HashSet<_>>(),
142 )
143 };
144 let mut object_ids_to_delete: HashSet<_> = HashSet::default();
145 let version_ids_to_delete: Vec<risingwave_meta_model::HummockVersionId> =
146 hummock_time_travel_version::Entity::find()
147 .select_only()
148 .column(hummock_time_travel_version::Column::VersionId)
149 .filter(hummock_time_travel_version::Column::VersionId.lt(latest_valid_version_id))
150 .order_by_desc(hummock_time_travel_version::Column::VersionId)
151 .into_tuple()
152 .all(&txn)
153 .await?;
154 let delta_ids_to_delete: Vec<risingwave_meta_model::HummockVersionId> =
155 hummock_time_travel_delta::Entity::find()
156 .select_only()
157 .column(hummock_time_travel_delta::Column::VersionId)
158 .filter(hummock_time_travel_delta::Column::VersionId.lt(latest_valid_version_id))
159 .into_tuple()
160 .all(&txn)
161 .await?;
162 let delete_sst_batch_size = self
164 .env
165 .opts
166 .hummock_time_travel_epoch_version_insert_batch_size;
167 let mut sst_ids_to_delete: HashSet<_> = HashSet::default();
168 async fn delete_sst_in_batch(
169 txn: &DatabaseTransaction,
170 sst_ids_to_delete: HashSet<HummockSstableId>,
171 delete_sst_batch_size: usize,
172 ) -> Result<()> {
173 for start_idx in 0..=(sst_ids_to_delete.len().saturating_sub(1) / delete_sst_batch_size)
174 {
175 hummock_sstable_info::Entity::delete_many()
176 .filter(
177 hummock_sstable_info::Column::SstId.is_in(
178 sst_ids_to_delete
179 .iter()
180 .skip(start_idx * delete_sst_batch_size)
181 .take(delete_sst_batch_size)
182 .copied(),
183 ),
184 )
185 .exec(txn)
186 .await?;
187 }
188 Ok(())
189 }
190 for delta_id_to_delete in delta_ids_to_delete {
191 let delta_to_delete = hummock_time_travel_delta::Entity::find_by_id(delta_id_to_delete)
192 .one(&txn)
193 .await?
194 .ok_or_else(|| {
195 Error::TimeTravel(anyhow!(format!(
196 "version delta {} not found",
197 delta_id_to_delete
198 )))
199 })?;
200 let delta_to_delete = IncompleteHummockVersionDelta::from_persisted_protobuf(
201 &delta_to_delete.version_delta.to_protobuf(),
202 );
203 let new_sst_ids = delta_to_delete.newly_added_sst_ids(true);
204 sst_ids_to_delete.extend(&new_sst_ids - &latest_valid_version_sst_ids);
206 if sst_ids_to_delete.len() >= delete_sst_batch_size {
207 delete_sst_in_batch(
208 &txn,
209 std::mem::take(&mut sst_ids_to_delete),
210 delete_sst_batch_size,
211 )
212 .await?;
213 }
214 let new_object_ids = delta_to_delete.newly_added_object_ids(true);
215 object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
216 }
217 let mut next_version_sst_ids = latest_valid_version_sst_ids;
218 for prev_version_id in version_ids_to_delete {
219 let prev_version = {
220 let prev_version = hummock_time_travel_version::Entity::find_by_id(prev_version_id)
221 .one(&txn)
222 .await?
223 .ok_or_else(|| {
224 Error::TimeTravel(anyhow!(format!(
225 "prev_version {} not found",
226 prev_version_id
227 )))
228 })?;
229 IncompleteHummockVersion::from_persisted_protobuf(
230 &prev_version.version.to_protobuf(),
231 )
232 };
233 let sst_ids = prev_version.get_sst_ids(true);
234 sst_ids_to_delete.extend(&sst_ids - &next_version_sst_ids);
236 if sst_ids_to_delete.len() >= delete_sst_batch_size {
237 delete_sst_in_batch(
238 &txn,
239 std::mem::take(&mut sst_ids_to_delete),
240 delete_sst_batch_size,
241 )
242 .await?;
243 }
244 let new_object_ids: HashSet<_> = prev_version.get_object_ids(true).collect();
245 object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
246 next_version_sst_ids = sst_ids;
247 }
248 if !sst_ids_to_delete.is_empty() {
249 delete_sst_in_batch(&txn, sst_ids_to_delete, delete_sst_batch_size).await?;
250 }
251
252 if !object_ids_to_delete.is_empty() {
253 self.gc_manager
256 .add_may_delete_object_ids(object_ids_to_delete.into_iter());
257 }
258
259 let res = hummock_time_travel_version::Entity::delete_many()
260 .filter(hummock_time_travel_version::Column::VersionId.lt(latest_valid_version_id))
261 .exec(&txn)
262 .await?;
263 tracing::info!(
264 %watermark_version_id,
265 %latest_valid_version_id,
266 "Deleted {} rows from hummock_time_travel_version.",
267 res.rows_affected
268 );
269
270 let res = hummock_time_travel_delta::Entity::delete_many()
271 .filter(hummock_time_travel_delta::Column::VersionId.lt(latest_valid_version_id))
272 .exec(&txn)
273 .await?;
274 tracing::info!(
275 %watermark_version_id,
276 %latest_valid_version_id,
277 "Deleted {} rows from hummock_time_travel_delta.",
278 res.rows_affected
279 );
280
281 txn.commit().await?;
282 Ok(())
283 }
284
285 pub(crate) async fn filter_out_objects_by_time_travel_v1(
286 &self,
287 objects: impl Iterator<Item = HummockObjectId>,
288 ) -> Result<HashSet<HummockObjectId>> {
289 let batch_size = self
290 .env
291 .opts
292 .hummock_time_travel_filter_out_objects_batch_size;
293 info!("filter out objects by time travel v1, only sst will remain in the result set");
294 let mut result: HashSet<_> = objects
297 .filter(|object_id| match object_id {
298 HummockObjectId::Sstable(_) => true,
299 HummockObjectId::VectorFile(_) | HummockObjectId::HnswGraphFile(_) => false,
300 })
301 .collect();
302 let mut remain_sst: VecDeque<_> = result.iter().copied().collect();
303 while !remain_sst.is_empty() {
304 let batch = remain_sst
305 .drain(..std::cmp::min(remain_sst.len(), batch_size))
306 .map(|object_id| object_id.as_raw());
307 let reject_object_ids: Vec<risingwave_meta_model::HummockSstableObjectId> =
308 hummock_sstable_info::Entity::find()
309 .filter(hummock_sstable_info::Column::ObjectId.is_in(batch))
310 .select_only()
311 .column(hummock_sstable_info::Column::ObjectId)
312 .into_tuple()
313 .all(&self.env.meta_store_ref().conn)
314 .await?;
315 for reject in reject_object_ids {
316 let object_id = HummockObjectId::Sstable(reject);
317 result.remove(&object_id);
318 }
319 }
320 Ok(result)
321 }
322
323 pub(crate) async fn filter_out_objects_by_time_travel(
324 &self,
325 objects: impl Iterator<Item = HummockObjectId>,
326 ) -> Result<HashSet<HummockObjectId>> {
327 if self.env.opts.hummock_time_travel_filter_out_objects_v1 {
328 return self.filter_out_objects_by_time_travel_v1(objects).await;
329 }
330 let mut result: HashSet<_> = objects.collect();
331
332 {
334 let mut prev_version_id: Option<HummockVersionId> = None;
335 loop {
336 let query = hummock_time_travel_version::Entity::find();
337 let query = if let Some(prev_version_id) = prev_version_id {
338 query.filter(hummock_time_travel_version::Column::VersionId.gt(prev_version_id))
339 } else {
340 query
341 };
342 let mut version_stream = query
343 .order_by_asc(hummock_time_travel_version::Column::VersionId)
344 .limit(
345 self.env
346 .opts
347 .hummock_time_travel_filter_out_objects_list_version_batch_size
348 as u64,
349 )
350 .stream(&self.env.meta_store_ref().conn)
351 .await?;
352 let mut next_prev_version_id = None;
353 while let Some(model) = version_stream.try_next().await? {
354 let version =
355 HummockVersion::from_persisted_protobuf(&model.version.to_protobuf());
356 for object_id in version.get_object_ids(true) {
357 result.remove(&object_id);
358 }
359 next_prev_version_id = Some(model.version_id);
360 }
361 if let Some(next_prev_version_id) = next_prev_version_id {
362 prev_version_id = Some(next_prev_version_id);
363 } else {
364 break;
365 }
366 }
367 }
368
369 {
371 let mut prev_version_id: Option<HummockVersionId> = None;
372 loop {
373 let query = hummock_time_travel_delta::Entity::find();
374 let query = if let Some(prev_version_id) = prev_version_id {
375 query.filter(hummock_time_travel_delta::Column::VersionId.gt(prev_version_id))
376 } else {
377 query
378 };
379 let mut version_stream = query
380 .order_by_asc(hummock_time_travel_delta::Column::VersionId)
381 .limit(
382 self.env
383 .opts
384 .hummock_time_travel_filter_out_objects_list_delta_batch_size
385 as u64,
386 )
387 .stream(&self.env.meta_store_ref().conn)
388 .await?;
389 let mut next_prev_version_id = None;
390 while let Some(model) = version_stream.try_next().await? {
391 let version_delta = HummockVersionDelta::from_persisted_protobuf(
392 &model.version_delta.to_protobuf(),
393 );
394 for object_id in version_delta.newly_added_object_ids(true) {
396 result.remove(&object_id);
397 }
398 next_prev_version_id = Some(model.version_id);
399 }
400 if let Some(next_prev_version_id) = next_prev_version_id {
401 prev_version_id = Some(next_prev_version_id);
402 } else {
403 break;
404 }
405 }
406 }
407
408 Ok(result)
409 }
410
411 pub(crate) async fn time_travel_pinned_object_count(&self) -> Result<u64> {
412 let count = hummock_sstable_info::Entity::find()
413 .count(&self.env.meta_store_ref().conn)
414 .await?;
415 Ok(count)
416 }
417
418 pub async fn epoch_to_version(
424 &self,
425 query_epoch: HummockEpoch,
426 table_id: TableId,
427 ) -> Result<HummockVersion> {
428 let sql_store = self.env.meta_store_ref();
429 let _permit = self.inflight_time_travel_query.try_acquire().map_err(|_| {
430 anyhow!(format!(
431 "too many inflight time travel queries, max_inflight_time_travel_query={}",
432 self.env.opts.max_inflight_time_travel_query
433 ))
434 })?;
435 let epoch_to_version = hummock_epoch_to_version::Entity::find()
436 .filter(
437 Condition::any()
438 .add(
439 hummock_epoch_to_version::Column::TableId
440 .eq(i64::from(table_id.as_raw_id())),
441 )
442 .add(hummock_epoch_to_version::Column::TableId.eq(0)),
444 )
445 .filter(
446 hummock_epoch_to_version::Column::Epoch
447 .lte(risingwave_meta_model::Epoch::try_from(query_epoch).unwrap()),
448 )
449 .order_by_desc(hummock_epoch_to_version::Column::Epoch)
450 .one(&sql_store.conn)
451 .await?
452 .ok_or_else(|| {
453 Error::TimeTravel(anyhow!(format!(
454 "version not found for epoch {}",
455 query_epoch
456 )))
457 })?;
458 let timer = self
459 .metrics
460 .time_travel_version_replay_latency
461 .start_timer();
462 let actual_version_id = epoch_to_version.version_id;
463 tracing::debug!(
464 query_epoch,
465 query_tz = ?(Epoch(query_epoch).as_timestamptz()),
466 actual_epoch = epoch_to_version.epoch,
467 actual_tz = ?(Epoch(u64::try_from(epoch_to_version.epoch).unwrap()).as_timestamptz()),
468 %actual_version_id,
469 "convert query epoch"
470 );
471
472 let replay_version = hummock_time_travel_version::Entity::find()
473 .filter(hummock_time_travel_version::Column::VersionId.lte(actual_version_id))
474 .order_by_desc(hummock_time_travel_version::Column::VersionId)
475 .one(&sql_store.conn)
476 .await?
477 .ok_or_else(|| {
478 Error::TimeTravel(anyhow!(format!(
479 "no replay version found for epoch {}, version {}",
480 query_epoch, actual_version_id,
481 )))
482 })?;
483 let deltas = hummock_time_travel_delta::Entity::find()
484 .filter(hummock_time_travel_delta::Column::VersionId.gt(replay_version.version_id))
485 .filter(hummock_time_travel_delta::Column::VersionId.lte(actual_version_id))
486 .order_by_asc(hummock_time_travel_delta::Column::VersionId)
487 .all(&sql_store.conn)
488 .await?;
489 let mut actual_version = replay_archive(
491 replay_version.version.to_protobuf(),
492 deltas.into_iter().map(|d| d.version_delta.to_protobuf()),
493 );
494
495 let mut sst_ids = actual_version
496 .get_sst_ids(true)
497 .into_iter()
498 .collect::<VecDeque<_>>();
499 let sst_count = sst_ids.len();
500 let mut sst_id_to_info = HashMap::with_capacity(sst_count);
501 let sst_info_fetch_batch_size = self.env.opts.hummock_time_travel_sst_info_fetch_batch_size;
502 while !sst_ids.is_empty() {
503 let sst_infos = hummock_sstable_info::Entity::find()
504 .filter(hummock_sstable_info::Column::SstId.is_in(
505 sst_ids.drain(..std::cmp::min(sst_info_fetch_batch_size, sst_ids.len())),
506 ))
507 .all(&sql_store.conn)
508 .await?;
509 for sst_info in sst_infos {
510 let sst_info: SstableInfo = sst_info.sstable_info.to_protobuf().into();
511 sst_id_to_info.insert(sst_info.sst_id, sst_info);
512 }
513 }
514 if sst_count != sst_id_to_info.len() {
515 return Err(Error::TimeTravel(anyhow!(format!(
516 "some SstableInfos not found for epoch {}, version {}",
517 query_epoch, actual_version_id,
518 ))));
519 }
520 refill_version(&mut actual_version, &sst_id_to_info, table_id);
521 timer.observe_duration();
522 Ok(actual_version)
523 }
524
525 pub(crate) async fn write_time_travel_metadata(
526 &self,
527 txn: &DatabaseTransaction,
528 version: Option<&HummockVersion>,
529 delta: HummockVersionDelta,
530 time_travel_table_ids: HashSet<StateTableId>,
531 skip_sst_ids: &HashSet<HummockSstableId>,
532 tables_to_commit: impl Iterator<Item = (&TableId, &CompactionGroupId, u64)>,
533 ) -> Result<Option<HashSet<HummockSstableId>>> {
534 let _timer = self
535 .metrics
536 .time_travel_write_metadata_latency
537 .start_timer();
538 if self
539 .env
540 .system_params_reader()
541 .await
542 .time_travel_retention_ms()
543 == 0
544 {
545 return Ok(None);
546 }
547 async fn write_sstable_infos(
548 mut sst_infos: impl Iterator<Item = &SstableInfo>,
549 txn: &DatabaseTransaction,
550 batch_size: usize,
551 ) -> Result<usize> {
552 let mut count = 0;
553 let mut is_finished = false;
554 while !is_finished {
555 let mut remain = batch_size;
556 let mut batch = vec![];
557 while remain > 0 {
558 let Some(sst_info) = sst_infos.next() else {
559 is_finished = true;
560 break;
561 };
562 batch.push(hummock_sstable_info::ActiveModel {
563 sst_id: Set(sst_info.sst_id),
564 object_id: Set(sst_info.object_id),
565 sstable_info: Set(SstableInfoV2Backend::from(&sst_info.to_protobuf())),
566 });
567 remain -= 1;
568 count += 1;
569 }
570 if batch.is_empty() {
571 break;
572 }
573 hummock_sstable_info::Entity::insert_many(batch)
574 .on_conflict_do_nothing()
575 .exec(txn)
576 .await?;
577 }
578 Ok(count)
579 }
580
581 let mut batch = vec![];
582 for (table_id, _cg_id, committed_epoch) in tables_to_commit {
583 let m = hummock_epoch_to_version::ActiveModel {
584 epoch: Set(committed_epoch.try_into().unwrap()),
585 table_id: Set(i64::from(table_id.as_raw_id())),
586 version_id: Set(delta.id),
587 };
588 batch.push(m);
589 if batch.len()
590 >= self
591 .env
592 .opts
593 .hummock_time_travel_epoch_version_insert_batch_size
594 {
595 hummock_epoch_to_version::Entity::insert_many(std::mem::take(&mut batch))
597 .do_nothing()
598 .exec(txn)
599 .await?;
600 }
601 }
602 if !batch.is_empty() {
603 hummock_epoch_to_version::Entity::insert_many(batch)
605 .do_nothing()
606 .exec(txn)
607 .await?;
608 }
609
610 let mut version_sst_ids = None;
611 if let Some(version) = version {
612 version_sst_ids = Some(
614 version
615 .get_sst_infos(true)
616 .filter_map(|s| {
617 if s.table_ids
618 .iter()
619 .any(|tid| time_travel_table_ids.contains(tid))
620 {
621 return Some(s.sst_id);
622 }
623 None
624 })
625 .collect(),
626 );
627 write_sstable_infos(
628 version.get_sst_infos(true).filter(|s| {
629 !skip_sst_ids.contains(&s.sst_id)
630 && s.table_ids
631 .iter()
632 .any(|tid| time_travel_table_ids.contains(tid))
633 }),
634 txn,
635 self.env.opts.hummock_time_travel_sst_info_insert_batch_size,
636 )
637 .await?;
638 let m = hummock_time_travel_version::ActiveModel {
639 version_id: Set(version.id),
640 version: Set(
641 (&IncompleteHummockVersion::from((version, &time_travel_table_ids))
642 .to_protobuf())
643 .into(),
644 ),
645 };
646 hummock_time_travel_version::Entity::insert(m)
647 .on_conflict_do_nothing()
648 .exec(txn)
649 .await?;
650 return Ok(version_sst_ids);
652 }
653 let written = write_sstable_infos(
654 delta.newly_added_sst_infos(true).filter(|s| {
655 !skip_sst_ids.contains(&s.sst_id)
656 && s.table_ids
657 .iter()
658 .any(|tid| time_travel_table_ids.contains(tid))
659 }),
660 txn,
661 self.env.opts.hummock_time_travel_sst_info_insert_batch_size,
662 )
663 .await?;
664 let has_state_table_info_delta = delta
665 .state_table_info_delta
666 .keys()
667 .any(|table_id| time_travel_table_ids.contains(table_id));
668 if written > 0 || has_state_table_info_delta {
669 let m = hummock_time_travel_delta::ActiveModel {
670 version_id: Set(delta.id),
671 version_delta: Set((&IncompleteHummockVersionDelta::from((
672 &delta,
673 &time_travel_table_ids,
674 ))
675 .to_protobuf())
676 .into()),
677 };
678 hummock_time_travel_delta::Entity::insert(m)
679 .on_conflict_do_nothing()
680 .exec(txn)
681 .await?;
682 }
683
684 Ok(version_sst_ids)
685 }
686}
687
688fn replay_archive(
690 version: PbHummockVersion,
691 deltas: impl Iterator<Item = PbHummockVersionDelta>,
692) -> HummockVersion {
693 let mut last_version = HummockVersion::from_persisted_protobuf(&version);
696 for d in deltas {
697 let d = HummockVersionDelta::from_persisted_protobuf(&d);
698 debug_assert!(
699 !should_mark_next_time_travel_version_snapshot(&d),
700 "unexpected time travel delta {:?}",
701 d
702 );
703 while last_version.id < d.prev_id {
706 last_version.id += 1;
707 }
708 last_version.apply_version_delta(&d);
709 }
710 last_version
711}
712
713pub fn require_sql_meta_store_err() -> Error {
714 Error::TimeTravel(anyhow!("require SQL meta store"))
715}
716
717pub fn should_mark_next_time_travel_version_snapshot(delta: &HummockVersionDelta) -> bool {
719 delta.group_deltas.iter().any(|(_, deltas)| {
720 deltas
721 .group_deltas
722 .iter()
723 .any(|d| !matches!(d, GroupDeltaCommon::NewL0SubLevel(_)))
724 })
725}