risingwave_meta/hummock/manager/
time_travel.rs1use std::collections::{HashMap, HashSet, VecDeque};
16
17use anyhow::anyhow;
18use futures::TryStreamExt;
19use risingwave_common::catalog::TableId;
20use risingwave_common::system_param::reader::SystemParamsRead;
21use risingwave_common::util::epoch::Epoch;
22use risingwave_hummock_sdk::compaction_group::StateTableId;
23use risingwave_hummock_sdk::sstable_info::SstableInfo;
24use risingwave_hummock_sdk::time_travel::{
25 IncompleteHummockVersion, IncompleteHummockVersionDelta, refill_version,
26};
27use risingwave_hummock_sdk::version::{GroupDeltaCommon, HummockVersion, HummockVersionDelta};
28use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, HummockObjectId, HummockSstableId};
29use risingwave_meta_model::hummock_sstable_info::SstableInfoV2Backend;
30use risingwave_meta_model::{
31 HummockVersionId, hummock_epoch_to_version, hummock_sstable_info, hummock_time_travel_delta,
32 hummock_time_travel_version,
33};
34use risingwave_pb::hummock::{PbHummockVersion, PbHummockVersionDelta};
35use sea_orm::ActiveValue::Set;
36use sea_orm::{
37 ColumnTrait, Condition, DatabaseTransaction, EntityTrait, PaginatorTrait, QueryFilter,
38 QueryOrder, QuerySelect, TransactionTrait,
39};
40use tracing::info;
41
42use crate::hummock::HummockManager;
43use crate::hummock::error::{Error, Result};
44
45impl HummockManager {
47 pub(crate) async fn init_time_travel_state(&self) -> Result<()> {
48 let sql_store = self.env.meta_store_ref();
49 let mut guard = self.versioning.write().await;
50 guard.mark_next_time_travel_version_snapshot();
51
52 guard.last_time_travel_snapshot_sst_ids = HashSet::new();
53 let Some(version) = hummock_time_travel_version::Entity::find()
54 .order_by_desc(hummock_time_travel_version::Column::VersionId)
55 .one(&sql_store.conn)
56 .await?
57 .map(|v| {
58 IncompleteHummockVersion::from_persisted_protobuf_owned(v.version.to_protobuf())
59 })
60 else {
61 return Ok(());
62 };
63 guard.last_time_travel_snapshot_sst_ids = version.get_sst_ids(true);
64 Ok(())
65 }
66
67 pub(crate) async fn truncate_time_travel_metadata(
68 &self,
69 epoch_watermark: HummockEpoch,
70 ) -> Result<()> {
71 let _timer = self
72 .metrics
73 .time_travel_vacuum_metadata_latency
74 .start_timer();
75 let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id();
76 let sql_store = self.env.meta_store_ref();
77 let txn = sql_store.conn.begin().await?;
78 let version_watermark = hummock_epoch_to_version::Entity::find()
79 .filter(
80 hummock_epoch_to_version::Column::Epoch
81 .lt(risingwave_meta_model::Epoch::try_from(epoch_watermark).unwrap()),
82 )
83 .order_by_desc(hummock_epoch_to_version::Column::Epoch)
84 .order_by_asc(hummock_epoch_to_version::Column::VersionId)
85 .one(&txn)
86 .await?;
87 let Some(version_watermark) = version_watermark else {
88 txn.commit().await?;
89 return Ok(());
90 };
91 let mut watermark_version_id =
92 std::cmp::min(version_watermark.version_id, min_pinned_version_id);
93 if let Some(max_version_count) = self.env.opts.time_travel_vacuum_max_version_count
94 && let Some(earliest_version_id) = hummock_time_travel_version::Entity::find()
95 .select_only()
96 .column(hummock_time_travel_version::Column::VersionId)
97 .order_by_asc(hummock_time_travel_version::Column::VersionId)
98 .into_tuple::<HummockVersionId>()
99 .one(&txn)
100 .await?
101 {
102 watermark_version_id = std::cmp::min(
103 watermark_version_id,
104 HummockVersionId::new(
105 earliest_version_id
106 .as_raw_id()
107 .saturating_add(max_version_count.into()),
108 ),
109 );
110 }
111 let res = hummock_epoch_to_version::Entity::delete_many()
112 .filter(
113 hummock_epoch_to_version::Column::Epoch
114 .lt(risingwave_meta_model::Epoch::try_from(epoch_watermark).unwrap()),
115 )
116 .exec(&txn)
117 .await?;
118 tracing::info!(
119 epoch_watermark,
120 "Delete {} rows from hummock_epoch_to_version.",
121 res.rows_affected
122 );
123 let latest_valid_version = hummock_time_travel_version::Entity::find()
124 .filter(hummock_time_travel_version::Column::VersionId.lte(watermark_version_id))
125 .order_by_desc(hummock_time_travel_version::Column::VersionId)
126 .one(&txn)
127 .await?
128 .map(|m| {
129 IncompleteHummockVersion::from_persisted_protobuf_owned(m.version.to_protobuf())
130 });
131 let Some(latest_valid_version) = latest_valid_version else {
132 txn.commit().await?;
133 return Ok(());
134 };
135 let (
136 latest_valid_version_id,
137 latest_valid_version_sst_ids,
138 latest_valid_version_object_ids,
139 ) = {
140 (
141 latest_valid_version.id,
142 latest_valid_version.get_sst_ids(true),
143 latest_valid_version
144 .get_object_ids(true)
145 .collect::<HashSet<_>>(),
146 )
147 };
148 let mut object_ids_to_delete: HashSet<_> = HashSet::default();
149 let version_ids_to_delete: Vec<risingwave_meta_model::HummockVersionId> =
150 hummock_time_travel_version::Entity::find()
151 .select_only()
152 .column(hummock_time_travel_version::Column::VersionId)
153 .filter(hummock_time_travel_version::Column::VersionId.lt(latest_valid_version_id))
154 .order_by_desc(hummock_time_travel_version::Column::VersionId)
155 .into_tuple()
156 .all(&txn)
157 .await?;
158 let delta_ids_to_delete: Vec<risingwave_meta_model::HummockVersionId> =
159 hummock_time_travel_delta::Entity::find()
160 .select_only()
161 .column(hummock_time_travel_delta::Column::VersionId)
162 .filter(hummock_time_travel_delta::Column::VersionId.lt(latest_valid_version_id))
163 .into_tuple()
164 .all(&txn)
165 .await?;
166 let delete_sst_batch_size = self
168 .env
169 .opts
170 .hummock_time_travel_epoch_version_insert_batch_size;
171 let mut sst_ids_to_delete: HashSet<_> = HashSet::default();
172 async fn delete_sst_in_batch(
173 txn: &DatabaseTransaction,
174 sst_ids_to_delete: HashSet<HummockSstableId>,
175 delete_sst_batch_size: usize,
176 ) -> Result<()> {
177 for start_idx in 0..=(sst_ids_to_delete.len().saturating_sub(1) / delete_sst_batch_size)
178 {
179 hummock_sstable_info::Entity::delete_many()
180 .filter(
181 hummock_sstable_info::Column::SstId.is_in(
182 sst_ids_to_delete
183 .iter()
184 .skip(start_idx * delete_sst_batch_size)
185 .take(delete_sst_batch_size)
186 .copied(),
187 ),
188 )
189 .exec(txn)
190 .await?;
191 }
192 Ok(())
193 }
194 for delta_id_to_delete in delta_ids_to_delete {
195 let delta_to_delete = hummock_time_travel_delta::Entity::find_by_id(delta_id_to_delete)
196 .one(&txn)
197 .await?
198 .ok_or_else(|| {
199 Error::TimeTravel(anyhow!(format!(
200 "version delta {} not found",
201 delta_id_to_delete
202 )))
203 })?;
204 let delta_to_delete = IncompleteHummockVersionDelta::from_persisted_protobuf_owned(
205 delta_to_delete.version_delta.to_protobuf(),
206 );
207 let new_sst_ids = delta_to_delete.newly_added_sst_ids(true);
208 sst_ids_to_delete.extend(&new_sst_ids - &latest_valid_version_sst_ids);
210 if sst_ids_to_delete.len() >= delete_sst_batch_size {
211 delete_sst_in_batch(
212 &txn,
213 std::mem::take(&mut sst_ids_to_delete),
214 delete_sst_batch_size,
215 )
216 .await?;
217 }
218 let new_object_ids = delta_to_delete.newly_added_object_ids(true);
219 object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
220 }
221 let mut next_version_sst_ids = latest_valid_version_sst_ids;
222 for prev_version_id in version_ids_to_delete {
223 let prev_version = {
224 let prev_version = hummock_time_travel_version::Entity::find_by_id(prev_version_id)
225 .one(&txn)
226 .await?
227 .ok_or_else(|| {
228 Error::TimeTravel(anyhow!(format!(
229 "prev_version {} not found",
230 prev_version_id
231 )))
232 })?;
233 IncompleteHummockVersion::from_persisted_protobuf_owned(
234 prev_version.version.to_protobuf(),
235 )
236 };
237 let sst_ids = prev_version.get_sst_ids(true);
238 sst_ids_to_delete.extend(&sst_ids - &next_version_sst_ids);
240 if sst_ids_to_delete.len() >= delete_sst_batch_size {
241 delete_sst_in_batch(
242 &txn,
243 std::mem::take(&mut sst_ids_to_delete),
244 delete_sst_batch_size,
245 )
246 .await?;
247 }
248 let new_object_ids: HashSet<_> = prev_version.get_object_ids(true).collect();
249 object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
250 next_version_sst_ids = sst_ids;
251 }
252 if !sst_ids_to_delete.is_empty() {
253 delete_sst_in_batch(&txn, sst_ids_to_delete, delete_sst_batch_size).await?;
254 }
255
256 if !object_ids_to_delete.is_empty() {
257 self.gc_manager
260 .add_may_delete_object_ids(object_ids_to_delete.into_iter());
261 }
262
263 let res = hummock_time_travel_version::Entity::delete_many()
264 .filter(hummock_time_travel_version::Column::VersionId.lt(latest_valid_version_id))
265 .exec(&txn)
266 .await?;
267 tracing::info!(
268 %watermark_version_id,
269 %latest_valid_version_id,
270 "Deleted {} rows from hummock_time_travel_version.",
271 res.rows_affected
272 );
273
274 let res = hummock_time_travel_delta::Entity::delete_many()
275 .filter(hummock_time_travel_delta::Column::VersionId.lt(latest_valid_version_id))
276 .exec(&txn)
277 .await?;
278 tracing::info!(
279 %watermark_version_id,
280 %latest_valid_version_id,
281 "Deleted {} rows from hummock_time_travel_delta.",
282 res.rows_affected
283 );
284
285 txn.commit().await?;
286 Ok(())
287 }
288
289 pub(crate) async fn filter_out_objects_by_time_travel_v1(
290 &self,
291 objects: impl Iterator<Item = HummockObjectId>,
292 ) -> Result<HashSet<HummockObjectId>> {
293 let batch_size = self
294 .env
295 .opts
296 .hummock_time_travel_filter_out_objects_batch_size;
297 info!("filter out objects by time travel v1, only sst will remain in the result set");
298 let mut result: HashSet<_> = objects
301 .filter(|object_id| match object_id {
302 HummockObjectId::Sstable(_) => true,
303 HummockObjectId::VectorFile(_) | HummockObjectId::HnswGraphFile(_) => false,
304 })
305 .collect();
306 let mut remain_sst: VecDeque<_> = result.iter().copied().collect();
307 while !remain_sst.is_empty() {
308 let batch = remain_sst
309 .drain(..std::cmp::min(remain_sst.len(), batch_size))
310 .map(|object_id| object_id.as_raw());
311 let reject_object_ids: Vec<risingwave_meta_model::HummockSstableObjectId> =
312 hummock_sstable_info::Entity::find()
313 .filter(hummock_sstable_info::Column::ObjectId.is_in(batch))
314 .select_only()
315 .column(hummock_sstable_info::Column::ObjectId)
316 .into_tuple()
317 .all(&self.env.meta_store_ref().conn)
318 .await?;
319 for reject in reject_object_ids {
320 let object_id = HummockObjectId::Sstable(reject);
321 result.remove(&object_id);
322 }
323 }
324 Ok(result)
325 }
326
327 pub(crate) async fn filter_out_objects_by_time_travel(
328 &self,
329 objects: impl Iterator<Item = HummockObjectId>,
330 ) -> Result<HashSet<HummockObjectId>> {
331 if self.env.opts.hummock_time_travel_filter_out_objects_v1 {
332 return self.filter_out_objects_by_time_travel_v1(objects).await;
333 }
334 let mut result: HashSet<_> = objects.collect();
335
336 {
338 let mut prev_version_id: Option<HummockVersionId> = None;
339 loop {
340 let query = hummock_time_travel_version::Entity::find();
341 let query = if let Some(prev_version_id) = prev_version_id {
342 query.filter(hummock_time_travel_version::Column::VersionId.gt(prev_version_id))
343 } else {
344 query
345 };
346 let mut version_stream = query
347 .order_by_asc(hummock_time_travel_version::Column::VersionId)
348 .limit(
349 self.env
350 .opts
351 .hummock_time_travel_filter_out_objects_list_version_batch_size
352 as u64,
353 )
354 .stream(&self.env.meta_store_ref().conn)
355 .await?;
356 let mut next_prev_version_id = None;
357 while let Some(model) = version_stream.try_next().await? {
358 let version =
359 HummockVersion::from_persisted_protobuf_owned(model.version.to_protobuf());
360 for object_id in version.get_object_ids(true) {
361 result.remove(&object_id);
362 }
363 next_prev_version_id = Some(model.version_id);
364 }
365 if let Some(next_prev_version_id) = next_prev_version_id {
366 prev_version_id = Some(next_prev_version_id);
367 } else {
368 break;
369 }
370 }
371 }
372
373 {
375 let mut prev_version_id: Option<HummockVersionId> = None;
376 loop {
377 let query = hummock_time_travel_delta::Entity::find();
378 let query = if let Some(prev_version_id) = prev_version_id {
379 query.filter(hummock_time_travel_delta::Column::VersionId.gt(prev_version_id))
380 } else {
381 query
382 };
383 let mut version_stream = query
384 .order_by_asc(hummock_time_travel_delta::Column::VersionId)
385 .limit(
386 self.env
387 .opts
388 .hummock_time_travel_filter_out_objects_list_delta_batch_size
389 as u64,
390 )
391 .stream(&self.env.meta_store_ref().conn)
392 .await?;
393 let mut next_prev_version_id = None;
394 while let Some(model) = version_stream.try_next().await? {
395 let version_delta = HummockVersionDelta::from_persisted_protobuf_owned(
396 model.version_delta.to_protobuf(),
397 );
398 for object_id in version_delta.newly_added_object_ids(true) {
400 result.remove(&object_id);
401 }
402 next_prev_version_id = Some(model.version_id);
403 }
404 if let Some(next_prev_version_id) = next_prev_version_id {
405 prev_version_id = Some(next_prev_version_id);
406 } else {
407 break;
408 }
409 }
410 }
411
412 Ok(result)
413 }
414
415 pub(crate) async fn time_travel_pinned_object_count(&self) -> Result<u64> {
416 let count = hummock_sstable_info::Entity::find()
417 .count(&self.env.meta_store_ref().conn)
418 .await?;
419 Ok(count)
420 }
421
422 pub async fn epoch_to_version(
428 &self,
429 query_epoch: HummockEpoch,
430 table_id: TableId,
431 ) -> Result<HummockVersion> {
432 let sql_store = self.env.meta_store_ref();
433 let _permit = self.inflight_time_travel_query.try_acquire().map_err(|_| {
434 anyhow!(format!(
435 "too many inflight time travel queries, max_inflight_time_travel_query={}",
436 self.env.opts.max_inflight_time_travel_query
437 ))
438 })?;
439 let epoch_to_version = hummock_epoch_to_version::Entity::find()
440 .filter(
441 Condition::any()
442 .add(
443 hummock_epoch_to_version::Column::TableId
444 .eq(i64::from(table_id.as_raw_id())),
445 )
446 .add(hummock_epoch_to_version::Column::TableId.eq(0)),
448 )
449 .filter(
450 hummock_epoch_to_version::Column::Epoch
451 .lte(risingwave_meta_model::Epoch::try_from(query_epoch).unwrap()),
452 )
453 .order_by_desc(hummock_epoch_to_version::Column::Epoch)
454 .one(&sql_store.conn)
455 .await?
456 .ok_or_else(|| {
457 Error::TimeTravel(anyhow!(format!(
458 "version not found for epoch {}",
459 query_epoch
460 )))
461 })?;
462 let timer = self
463 .metrics
464 .time_travel_version_replay_latency
465 .start_timer();
466 let actual_version_id = epoch_to_version.version_id;
467 tracing::debug!(
468 query_epoch,
469 query_tz = ?(Epoch(query_epoch).as_timestamptz()),
470 actual_epoch = epoch_to_version.epoch,
471 actual_tz = ?(Epoch(u64::try_from(epoch_to_version.epoch).unwrap()).as_timestamptz()),
472 %actual_version_id,
473 "convert query epoch"
474 );
475
476 let replay_version = hummock_time_travel_version::Entity::find()
477 .filter(hummock_time_travel_version::Column::VersionId.lte(actual_version_id))
478 .order_by_desc(hummock_time_travel_version::Column::VersionId)
479 .one(&sql_store.conn)
480 .await?
481 .ok_or_else(|| {
482 Error::TimeTravel(anyhow!(format!(
483 "no replay version found for epoch {}, version {}",
484 query_epoch, actual_version_id,
485 )))
486 })?;
487 let deltas = hummock_time_travel_delta::Entity::find()
488 .filter(hummock_time_travel_delta::Column::VersionId.gt(replay_version.version_id))
489 .filter(hummock_time_travel_delta::Column::VersionId.lte(actual_version_id))
490 .order_by_asc(hummock_time_travel_delta::Column::VersionId)
491 .all(&sql_store.conn)
492 .await?;
493 let mut actual_version = replay_archive(
495 replay_version.version.to_protobuf(),
496 deltas.into_iter().map(|d| d.version_delta.to_protobuf()),
497 );
498
499 let mut sst_ids = actual_version
500 .get_sst_ids(true)
501 .into_iter()
502 .collect::<VecDeque<_>>();
503 let sst_count = sst_ids.len();
504 let mut sst_id_to_info = HashMap::with_capacity(sst_count);
505 let sst_info_fetch_batch_size = self.env.opts.hummock_time_travel_sst_info_fetch_batch_size;
506 while !sst_ids.is_empty() {
507 let sst_infos = hummock_sstable_info::Entity::find()
508 .filter(hummock_sstable_info::Column::SstId.is_in(
509 sst_ids.drain(..std::cmp::min(sst_info_fetch_batch_size, sst_ids.len())),
510 ))
511 .all(&sql_store.conn)
512 .await?;
513 for sst_info in sst_infos {
514 let sst_info: SstableInfo = sst_info.sstable_info.to_protobuf().into();
515 sst_id_to_info.insert(sst_info.sst_id, sst_info);
516 }
517 }
518 if sst_count != sst_id_to_info.len() {
519 return Err(Error::TimeTravel(anyhow!(format!(
520 "some SstableInfos not found for epoch {}, version {}",
521 query_epoch, actual_version_id,
522 ))));
523 }
524 refill_version(&mut actual_version, &sst_id_to_info, table_id);
525 timer.observe_duration();
526 Ok(actual_version)
527 }
528
529 pub(crate) async fn write_time_travel_metadata(
530 &self,
531 txn: &DatabaseTransaction,
532 version: Option<&HummockVersion>,
533 delta: HummockVersionDelta,
534 time_travel_table_ids: HashSet<StateTableId>,
535 skip_sst_ids: &HashSet<HummockSstableId>,
536 tables_to_commit: impl Iterator<Item = (&TableId, &CompactionGroupId, u64)>,
537 ) -> Result<Option<HashSet<HummockSstableId>>> {
538 let _timer = self
539 .metrics
540 .time_travel_write_metadata_latency
541 .start_timer();
542 if self
543 .env
544 .system_params_reader()
545 .await
546 .time_travel_retention_ms()
547 == 0
548 {
549 return Ok(None);
550 }
551 async fn write_sstable_infos(
552 mut sst_infos: impl Iterator<Item = &SstableInfo>,
553 txn: &DatabaseTransaction,
554 batch_size: usize,
555 ) -> Result<usize> {
556 let mut count = 0;
557 let mut is_finished = false;
558 while !is_finished {
559 let mut remain = batch_size;
560 let mut batch = vec![];
561 while remain > 0 {
562 let Some(sst_info) = sst_infos.next() else {
563 is_finished = true;
564 break;
565 };
566 batch.push(hummock_sstable_info::ActiveModel {
567 sst_id: Set(sst_info.sst_id),
568 object_id: Set(sst_info.object_id),
569 sstable_info: Set(SstableInfoV2Backend::from(&sst_info.to_protobuf())),
570 });
571 remain -= 1;
572 count += 1;
573 }
574 if batch.is_empty() {
575 break;
576 }
577 hummock_sstable_info::Entity::insert_many(batch)
578 .on_conflict_do_nothing()
579 .exec(txn)
580 .await?;
581 }
582 Ok(count)
583 }
584
585 let mut batch = vec![];
586 for (table_id, _cg_id, committed_epoch) in tables_to_commit {
587 let m = hummock_epoch_to_version::ActiveModel {
588 epoch: Set(committed_epoch.try_into().unwrap()),
589 table_id: Set(i64::from(table_id.as_raw_id())),
590 version_id: Set(delta.id),
591 };
592 batch.push(m);
593 if batch.len()
594 >= self
595 .env
596 .opts
597 .hummock_time_travel_epoch_version_insert_batch_size
598 {
599 hummock_epoch_to_version::Entity::insert_many(std::mem::take(&mut batch))
601 .do_nothing()
602 .exec(txn)
603 .await?;
604 }
605 }
606 if !batch.is_empty() {
607 hummock_epoch_to_version::Entity::insert_many(batch)
609 .do_nothing()
610 .exec(txn)
611 .await?;
612 }
613
614 let mut version_sst_ids = None;
615 if let Some(version) = version {
616 version_sst_ids = Some(
618 version
619 .get_sst_infos(true)
620 .filter_map(|s| {
621 if s.table_ids
622 .iter()
623 .any(|tid| time_travel_table_ids.contains(tid))
624 {
625 return Some(s.sst_id);
626 }
627 None
628 })
629 .collect(),
630 );
631 write_sstable_infos(
632 version.get_sst_infos(true).filter(|s| {
633 !skip_sst_ids.contains(&s.sst_id)
634 && s.table_ids
635 .iter()
636 .any(|tid| time_travel_table_ids.contains(tid))
637 }),
638 txn,
639 self.env.opts.hummock_time_travel_sst_info_insert_batch_size,
640 )
641 .await?;
642 let m = hummock_time_travel_version::ActiveModel {
643 version_id: Set(version.id),
644 version: Set(
645 (&IncompleteHummockVersion::from((version, &time_travel_table_ids))
646 .to_protobuf())
647 .into(),
648 ),
649 };
650 hummock_time_travel_version::Entity::insert(m)
651 .on_conflict_do_nothing()
652 .exec(txn)
653 .await?;
654 return Ok(version_sst_ids);
656 }
657 let written = write_sstable_infos(
658 delta.newly_added_sst_infos(true).filter(|s| {
659 !skip_sst_ids.contains(&s.sst_id)
660 && s.table_ids
661 .iter()
662 .any(|tid| time_travel_table_ids.contains(tid))
663 }),
664 txn,
665 self.env.opts.hummock_time_travel_sst_info_insert_batch_size,
666 )
667 .await?;
668 let has_state_table_info_delta = delta
669 .state_table_info_delta
670 .keys()
671 .any(|table_id| time_travel_table_ids.contains(table_id));
672 if written > 0 || has_state_table_info_delta {
673 let m = hummock_time_travel_delta::ActiveModel {
674 version_id: Set(delta.id),
675 version_delta: Set((&IncompleteHummockVersionDelta::from((
676 &delta,
677 &time_travel_table_ids,
678 ))
679 .to_protobuf())
680 .into()),
681 };
682 hummock_time_travel_delta::Entity::insert(m)
683 .on_conflict_do_nothing()
684 .exec(txn)
685 .await?;
686 }
687
688 Ok(version_sst_ids)
689 }
690}
691
692fn replay_archive(
694 version: PbHummockVersion,
695 deltas: impl Iterator<Item = PbHummockVersionDelta>,
696) -> HummockVersion {
697 let mut last_version = HummockVersion::from_persisted_protobuf_owned(version);
700 for d in deltas {
701 let d = HummockVersionDelta::from_persisted_protobuf_owned(d);
702 debug_assert!(
703 !should_mark_next_time_travel_version_snapshot(&d),
704 "unexpected time travel delta {:?}",
705 d
706 );
707 while last_version.id < d.prev_id {
710 last_version.id += 1;
711 }
712 last_version.apply_version_delta(&d);
713 }
714 last_version
715}
716
717pub fn require_sql_meta_store_err() -> Error {
718 Error::TimeTravel(anyhow!("require SQL meta store"))
719}
720
721pub fn should_mark_next_time_travel_version_snapshot(delta: &HummockVersionDelta) -> bool {
723 delta.group_deltas.iter().any(|(_, deltas)| {
724 deltas
725 .group_deltas
726 .iter()
727 .any(|d| !matches!(d, GroupDeltaCommon::NewL0SubLevel(_)))
728 })
729}