risingwave_meta/hummock/manager/
time_travel.rs1use std::collections::{HashMap, HashSet, VecDeque};
16
17use anyhow::anyhow;
18use futures::TryStreamExt;
19use risingwave_common::catalog::TableId;
20use risingwave_common::system_param::reader::SystemParamsRead;
21use risingwave_common::util::epoch::Epoch;
22use risingwave_hummock_sdk::compaction_group::StateTableId;
23use risingwave_hummock_sdk::sstable_info::SstableInfo;
24use risingwave_hummock_sdk::time_travel::{
25 IncompleteHummockVersion, IncompleteHummockVersionDelta, refill_version,
26};
27use risingwave_hummock_sdk::version::{GroupDeltaCommon, HummockVersion, HummockVersionDelta};
28use risingwave_hummock_sdk::{
29 CompactionGroupId, HummockEpoch, HummockObjectId, HummockSstableId, HummockSstableObjectId,
30};
31use risingwave_meta_model::hummock_sstable_info::SstableInfoV2Backend;
32use risingwave_meta_model::{
33 HummockVersionId, hummock_epoch_to_version, hummock_sstable_info, hummock_time_travel_delta,
34 hummock_time_travel_version,
35};
36use risingwave_pb::hummock::{PbHummockVersion, PbHummockVersionDelta};
37use sea_orm::ActiveValue::Set;
38use sea_orm::{
39 ColumnTrait, Condition, DatabaseTransaction, EntityTrait, PaginatorTrait, QueryFilter,
40 QueryOrder, QuerySelect, TransactionTrait,
41};
42use tracing::info;
43
44use crate::hummock::HummockManager;
45use crate::hummock::error::{Error, Result};
46
47impl HummockManager {
49 pub(crate) async fn init_time_travel_state(&self) -> Result<()> {
50 let sql_store = self.env.meta_store_ref();
51 let mut guard = self.versioning.write().await;
52 guard.mark_next_time_travel_version_snapshot();
53
54 guard.last_time_travel_snapshot_sst_ids = HashSet::new();
55 let Some(version) = hummock_time_travel_version::Entity::find()
56 .order_by_desc(hummock_time_travel_version::Column::VersionId)
57 .one(&sql_store.conn)
58 .await?
59 .map(|v| IncompleteHummockVersion::from_persisted_protobuf(&v.version.to_protobuf()))
60 else {
61 return Ok(());
62 };
63 guard.last_time_travel_snapshot_sst_ids = version.get_sst_ids(true);
64 Ok(())
65 }
66
67 pub(crate) async fn truncate_time_travel_metadata(
68 &self,
69 epoch_watermark: HummockEpoch,
70 ) -> Result<()> {
71 let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id();
72 let sql_store = self.env.meta_store_ref();
73 let txn = sql_store.conn.begin().await?;
74 let version_watermark = hummock_epoch_to_version::Entity::find()
75 .filter(
76 hummock_epoch_to_version::Column::Epoch
77 .lt(risingwave_meta_model::Epoch::try_from(epoch_watermark).unwrap()),
78 )
79 .order_by_desc(hummock_epoch_to_version::Column::Epoch)
80 .order_by_asc(hummock_epoch_to_version::Column::VersionId)
81 .one(&txn)
82 .await?;
83 let Some(version_watermark) = version_watermark else {
84 txn.commit().await?;
85 return Ok(());
86 };
87 let watermark_version_id = std::cmp::min(
88 version_watermark.version_id,
89 min_pinned_version_id.to_u64().try_into().unwrap(),
90 );
91 let res = hummock_epoch_to_version::Entity::delete_many()
92 .filter(
93 hummock_epoch_to_version::Column::Epoch
94 .lt(risingwave_meta_model::Epoch::try_from(epoch_watermark).unwrap()),
95 )
96 .exec(&txn)
97 .await?;
98 tracing::debug!(
99 epoch_watermark,
100 "delete {} rows from hummock_epoch_to_version",
101 res.rows_affected
102 );
103 let latest_valid_version = hummock_time_travel_version::Entity::find()
104 .filter(hummock_time_travel_version::Column::VersionId.lte(watermark_version_id))
105 .order_by_desc(hummock_time_travel_version::Column::VersionId)
106 .one(&txn)
107 .await?
108 .map(|m| IncompleteHummockVersion::from_persisted_protobuf(&m.version.to_protobuf()));
109 let Some(latest_valid_version) = latest_valid_version else {
110 txn.commit().await?;
111 return Ok(());
112 };
113 let (
114 latest_valid_version_id,
115 latest_valid_version_sst_ids,
116 latest_valid_version_object_ids,
117 ) = {
118 (
119 latest_valid_version.id,
120 latest_valid_version.get_sst_ids(true),
121 latest_valid_version
122 .get_object_ids(true)
123 .collect::<HashSet<_>>(),
124 )
125 };
126 let mut object_ids_to_delete: HashSet<_> = HashSet::default();
127 let version_ids_to_delete: Vec<risingwave_meta_model::HummockVersionId> =
128 hummock_time_travel_version::Entity::find()
129 .select_only()
130 .column(hummock_time_travel_version::Column::VersionId)
131 .filter(
132 hummock_time_travel_version::Column::VersionId
133 .lt(latest_valid_version_id.to_u64()),
134 )
135 .order_by_desc(hummock_time_travel_version::Column::VersionId)
136 .into_tuple()
137 .all(&txn)
138 .await?;
139 let delta_ids_to_delete: Vec<risingwave_meta_model::HummockVersionId> =
140 hummock_time_travel_delta::Entity::find()
141 .select_only()
142 .column(hummock_time_travel_delta::Column::VersionId)
143 .filter(
144 hummock_time_travel_delta::Column::VersionId
145 .lt(latest_valid_version_id.to_u64()),
146 )
147 .into_tuple()
148 .all(&txn)
149 .await?;
150 let delete_sst_batch_size = self
152 .env
153 .opts
154 .hummock_time_travel_epoch_version_insert_batch_size;
155 let mut sst_ids_to_delete: HashSet<_> = HashSet::default();
156 async fn delete_sst_in_batch(
157 txn: &DatabaseTransaction,
158 sst_ids_to_delete: HashSet<HummockSstableId>,
159 delete_sst_batch_size: usize,
160 ) -> Result<()> {
161 for start_idx in 0..=(sst_ids_to_delete.len().saturating_sub(1) / delete_sst_batch_size)
162 {
163 hummock_sstable_info::Entity::delete_many()
164 .filter(
165 hummock_sstable_info::Column::SstId.is_in(
166 sst_ids_to_delete
167 .iter()
168 .skip(start_idx * delete_sst_batch_size)
169 .take(delete_sst_batch_size)
170 .map(|sst_id| sst_id.inner()),
171 ),
172 )
173 .exec(txn)
174 .await?;
175 }
176 Ok(())
177 }
178 for delta_id_to_delete in delta_ids_to_delete {
179 let delta_to_delete = hummock_time_travel_delta::Entity::find_by_id(delta_id_to_delete)
180 .one(&txn)
181 .await?
182 .ok_or_else(|| {
183 Error::TimeTravel(anyhow!(format!(
184 "version delta {} not found",
185 delta_id_to_delete
186 )))
187 })?;
188 let delta_to_delete = IncompleteHummockVersionDelta::from_persisted_protobuf(
189 &delta_to_delete.version_delta.to_protobuf(),
190 );
191 let new_sst_ids = delta_to_delete.newly_added_sst_ids(true);
192 sst_ids_to_delete.extend(&new_sst_ids - &latest_valid_version_sst_ids);
194 if sst_ids_to_delete.len() >= delete_sst_batch_size {
195 delete_sst_in_batch(
196 &txn,
197 std::mem::take(&mut sst_ids_to_delete),
198 delete_sst_batch_size,
199 )
200 .await?;
201 }
202 let new_object_ids = delta_to_delete.newly_added_object_ids(true);
203 object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
204 }
205 let mut next_version_sst_ids = latest_valid_version_sst_ids;
206 for prev_version_id in version_ids_to_delete {
207 let prev_version = {
208 let prev_version = hummock_time_travel_version::Entity::find_by_id(prev_version_id)
209 .one(&txn)
210 .await?
211 .ok_or_else(|| {
212 Error::TimeTravel(anyhow!(format!(
213 "prev_version {} not found",
214 prev_version_id
215 )))
216 })?;
217 IncompleteHummockVersion::from_persisted_protobuf(
218 &prev_version.version.to_protobuf(),
219 )
220 };
221 let sst_ids = prev_version.get_sst_ids(true);
222 sst_ids_to_delete.extend(&sst_ids - &next_version_sst_ids);
224 if sst_ids_to_delete.len() >= delete_sst_batch_size {
225 delete_sst_in_batch(
226 &txn,
227 std::mem::take(&mut sst_ids_to_delete),
228 delete_sst_batch_size,
229 )
230 .await?;
231 }
232 let new_object_ids: HashSet<_> = prev_version.get_object_ids(true).collect();
233 object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
234 next_version_sst_ids = sst_ids;
235 }
236 if !sst_ids_to_delete.is_empty() {
237 delete_sst_in_batch(&txn, sst_ids_to_delete, delete_sst_batch_size).await?;
238 }
239
240 if !object_ids_to_delete.is_empty() {
241 self.gc_manager
244 .add_may_delete_object_ids(object_ids_to_delete.into_iter());
245 }
246
247 let res = hummock_time_travel_version::Entity::delete_many()
248 .filter(
249 hummock_time_travel_version::Column::VersionId.lt(latest_valid_version_id.to_u64()),
250 )
251 .exec(&txn)
252 .await?;
253 tracing::debug!(
254 epoch_watermark_version_id = ?watermark_version_id,
255 ?latest_valid_version_id,
256 "delete {} rows from hummock_time_travel_version",
257 res.rows_affected
258 );
259
260 let res = hummock_time_travel_delta::Entity::delete_many()
261 .filter(
262 hummock_time_travel_delta::Column::VersionId.lt(latest_valid_version_id.to_u64()),
263 )
264 .exec(&txn)
265 .await?;
266 tracing::debug!(
267 epoch_watermark_version_id = ?watermark_version_id,
268 ?latest_valid_version_id,
269 "delete {} rows from hummock_time_travel_delta",
270 res.rows_affected
271 );
272
273 txn.commit().await?;
274 Ok(())
275 }
276
277 pub(crate) async fn filter_out_objects_by_time_travel_v1(
278 &self,
279 objects: impl Iterator<Item = HummockObjectId>,
280 ) -> Result<HashSet<HummockObjectId>> {
281 let batch_size = self
282 .env
283 .opts
284 .hummock_time_travel_filter_out_objects_batch_size;
285 info!("filter out objects by time travel v1, only sst will remain in the result set");
286 let mut result: HashSet<_> = objects
289 .filter(|object_id| match object_id {
290 HummockObjectId::Sstable(_) => true,
291 HummockObjectId::VectorFile(_) => false,
292 })
293 .collect();
294 let mut remain_sst: VecDeque<_> = result.iter().copied().collect();
295 while !remain_sst.is_empty() {
296 let batch = remain_sst
297 .drain(..std::cmp::min(remain_sst.len(), batch_size))
298 .map(|object_id| object_id.as_raw().inner());
299 let reject_object_ids: Vec<risingwave_meta_model::HummockSstableObjectId> =
300 hummock_sstable_info::Entity::find()
301 .filter(hummock_sstable_info::Column::ObjectId.is_in(batch))
302 .select_only()
303 .column(hummock_sstable_info::Column::ObjectId)
304 .into_tuple()
305 .all(&self.env.meta_store_ref().conn)
306 .await?;
307 for reject in reject_object_ids {
308 let reject: u64 = reject.try_into().unwrap();
309 let object_id = HummockObjectId::Sstable(HummockSstableObjectId::from(reject));
310 result.remove(&object_id);
311 }
312 }
313 Ok(result)
314 }
315
316 pub(crate) async fn filter_out_objects_by_time_travel(
317 &self,
318 objects: impl Iterator<Item = HummockObjectId>,
319 ) -> Result<HashSet<HummockObjectId>> {
320 if self.env.opts.hummock_time_travel_filter_out_objects_v1 {
321 return self.filter_out_objects_by_time_travel_v1(objects).await;
322 }
323 let mut result: HashSet<_> = objects.collect();
324
325 {
327 let mut prev_version_id: Option<HummockVersionId> = None;
328 loop {
329 let query = hummock_time_travel_version::Entity::find();
330 let query = if let Some(prev_version_id) = prev_version_id {
331 query.filter(hummock_time_travel_version::Column::VersionId.gt(prev_version_id))
332 } else {
333 query
334 };
335 let mut version_stream = query
336 .order_by_asc(hummock_time_travel_version::Column::VersionId)
337 .limit(
338 self.env
339 .opts
340 .hummock_time_travel_filter_out_objects_list_version_batch_size
341 as u64,
342 )
343 .stream(&self.env.meta_store_ref().conn)
344 .await?;
345 let mut next_prev_version_id = None;
346 while let Some(model) = version_stream.try_next().await? {
347 let version =
348 HummockVersion::from_persisted_protobuf(&model.version.to_protobuf());
349 for object_id in version.get_object_ids(true) {
350 result.remove(&object_id);
351 }
352 next_prev_version_id = Some(model.version_id);
353 }
354 if let Some(next_prev_version_id) = next_prev_version_id {
355 prev_version_id = Some(next_prev_version_id);
356 } else {
357 break;
358 }
359 }
360 }
361
362 {
364 let mut prev_version_id: Option<HummockVersionId> = None;
365 loop {
366 let query = hummock_time_travel_delta::Entity::find();
367 let query = if let Some(prev_version_id) = prev_version_id {
368 query.filter(hummock_time_travel_delta::Column::VersionId.gt(prev_version_id))
369 } else {
370 query
371 };
372 let mut version_stream = query
373 .order_by_asc(hummock_time_travel_delta::Column::VersionId)
374 .limit(
375 self.env
376 .opts
377 .hummock_time_travel_filter_out_objects_list_delta_batch_size
378 as u64,
379 )
380 .stream(&self.env.meta_store_ref().conn)
381 .await?;
382 let mut next_prev_version_id = None;
383 while let Some(model) = version_stream.try_next().await? {
384 let version_delta = HummockVersionDelta::from_persisted_protobuf(
385 &model.version_delta.to_protobuf(),
386 );
387 for object_id in version_delta.newly_added_object_ids(true) {
389 result.remove(&object_id);
390 }
391 next_prev_version_id = Some(model.version_id);
392 }
393 if let Some(next_prev_version_id) = next_prev_version_id {
394 prev_version_id = Some(next_prev_version_id);
395 } else {
396 break;
397 }
398 }
399 }
400
401 Ok(result)
402 }
403
404 pub(crate) async fn time_travel_pinned_object_count(&self) -> Result<u64> {
405 let count = hummock_sstable_info::Entity::find()
406 .count(&self.env.meta_store_ref().conn)
407 .await?;
408 Ok(count)
409 }
410
411 pub async fn epoch_to_version(
417 &self,
418 query_epoch: HummockEpoch,
419 table_id: u32,
420 ) -> Result<HummockVersion> {
421 let sql_store = self.env.meta_store_ref();
422 let _permit = self.inflight_time_travel_query.try_acquire().map_err(|_| {
423 anyhow!(format!(
424 "too many inflight time travel queries, max_inflight_time_travel_query={}",
425 self.env.opts.max_inflight_time_travel_query
426 ))
427 })?;
428 let epoch_to_version = hummock_epoch_to_version::Entity::find()
429 .filter(
430 Condition::any()
431 .add(hummock_epoch_to_version::Column::TableId.eq(i64::from(table_id)))
432 .add(hummock_epoch_to_version::Column::TableId.eq(0)),
434 )
435 .filter(
436 hummock_epoch_to_version::Column::Epoch
437 .lte(risingwave_meta_model::Epoch::try_from(query_epoch).unwrap()),
438 )
439 .order_by_desc(hummock_epoch_to_version::Column::Epoch)
440 .one(&sql_store.conn)
441 .await?
442 .ok_or_else(|| {
443 Error::TimeTravel(anyhow!(format!(
444 "version not found for epoch {}",
445 query_epoch
446 )))
447 })?;
448 let timer = self
449 .metrics
450 .time_travel_version_replay_latency
451 .start_timer();
452 let actual_version_id = epoch_to_version.version_id;
453 tracing::debug!(
454 query_epoch,
455 query_tz = ?(Epoch(query_epoch).as_timestamptz()),
456 actual_epoch = epoch_to_version.epoch,
457 actual_tz = ?(Epoch(u64::try_from(epoch_to_version.epoch).unwrap()).as_timestamptz()),
458 actual_version_id,
459 "convert query epoch"
460 );
461
462 let replay_version = hummock_time_travel_version::Entity::find()
463 .filter(hummock_time_travel_version::Column::VersionId.lte(actual_version_id))
464 .order_by_desc(hummock_time_travel_version::Column::VersionId)
465 .one(&sql_store.conn)
466 .await?
467 .ok_or_else(|| {
468 Error::TimeTravel(anyhow!(format!(
469 "no replay version found for epoch {}, version {}",
470 query_epoch, actual_version_id,
471 )))
472 })?;
473 let deltas = hummock_time_travel_delta::Entity::find()
474 .filter(hummock_time_travel_delta::Column::VersionId.gt(replay_version.version_id))
475 .filter(hummock_time_travel_delta::Column::VersionId.lte(actual_version_id))
476 .order_by_asc(hummock_time_travel_delta::Column::VersionId)
477 .all(&sql_store.conn)
478 .await?;
479 let mut actual_version = replay_archive(
481 replay_version.version.to_protobuf(),
482 deltas.into_iter().map(|d| d.version_delta.to_protobuf()),
483 );
484
485 let mut sst_ids = actual_version
486 .get_sst_ids(true)
487 .into_iter()
488 .collect::<VecDeque<_>>();
489 let sst_count = sst_ids.len();
490 let mut sst_id_to_info = HashMap::with_capacity(sst_count);
491 let sst_info_fetch_batch_size = self.env.opts.hummock_time_travel_sst_info_fetch_batch_size;
492 while !sst_ids.is_empty() {
493 let sst_infos = hummock_sstable_info::Entity::find()
494 .filter(
495 hummock_sstable_info::Column::SstId.is_in(
496 sst_ids
497 .drain(..std::cmp::min(sst_info_fetch_batch_size, sst_ids.len()))
498 .map(|sst_id| sst_id.inner()),
499 ),
500 )
501 .all(&sql_store.conn)
502 .await?;
503 for sst_info in sst_infos {
504 let sst_info: SstableInfo = sst_info.sstable_info.to_protobuf().into();
505 sst_id_to_info.insert(sst_info.sst_id, sst_info);
506 }
507 }
508 if sst_count != sst_id_to_info.len() {
509 return Err(Error::TimeTravel(anyhow!(format!(
510 "some SstableInfos not found for epoch {}, version {}",
511 query_epoch, actual_version_id,
512 ))));
513 }
514 refill_version(&mut actual_version, &sst_id_to_info, table_id);
515 timer.observe_duration();
516 Ok(actual_version)
517 }
518
519 pub(crate) async fn write_time_travel_metadata(
520 &self,
521 txn: &DatabaseTransaction,
522 version: Option<&HummockVersion>,
523 delta: HummockVersionDelta,
524 time_travel_table_ids: HashSet<StateTableId>,
525 skip_sst_ids: &HashSet<HummockSstableId>,
526 tables_to_commit: impl Iterator<Item = (&TableId, &CompactionGroupId, u64)>,
527 ) -> Result<Option<HashSet<HummockSstableId>>> {
528 if self
529 .env
530 .system_params_reader()
531 .await
532 .time_travel_retention_ms()
533 == 0
534 {
535 return Ok(None);
536 }
537 async fn write_sstable_infos(
538 mut sst_infos: impl Iterator<Item = &SstableInfo>,
539 txn: &DatabaseTransaction,
540 batch_size: usize,
541 ) -> Result<usize> {
542 let mut count = 0;
543 let mut is_finished = false;
544 while !is_finished {
545 let mut remain = batch_size;
546 let mut batch = vec![];
547 while remain > 0 {
548 let Some(sst_info) = sst_infos.next() else {
549 is_finished = true;
550 break;
551 };
552 batch.push(hummock_sstable_info::ActiveModel {
553 sst_id: Set(sst_info.sst_id.inner().try_into().unwrap()),
554 object_id: Set(sst_info.object_id.inner().try_into().unwrap()),
555 sstable_info: Set(SstableInfoV2Backend::from(&sst_info.to_protobuf())),
556 });
557 remain -= 1;
558 count += 1;
559 }
560 if batch.is_empty() {
561 break;
562 }
563 hummock_sstable_info::Entity::insert_many(batch)
564 .on_conflict_do_nothing()
565 .exec(txn)
566 .await?;
567 }
568 Ok(count)
569 }
570
571 let mut batch = vec![];
572 for (table_id, _cg_id, committed_epoch) in tables_to_commit {
573 let version_id: u64 = delta.id.to_u64();
574 let m = hummock_epoch_to_version::ActiveModel {
575 epoch: Set(committed_epoch.try_into().unwrap()),
576 table_id: Set(table_id.table_id.into()),
577 version_id: Set(version_id.try_into().unwrap()),
578 };
579 batch.push(m);
580 if batch.len()
581 >= self
582 .env
583 .opts
584 .hummock_time_travel_epoch_version_insert_batch_size
585 {
586 hummock_epoch_to_version::Entity::insert_many(std::mem::take(&mut batch))
588 .do_nothing()
589 .exec(txn)
590 .await?;
591 }
592 }
593 if !batch.is_empty() {
594 hummock_epoch_to_version::Entity::insert_many(batch)
596 .do_nothing()
597 .exec(txn)
598 .await?;
599 }
600
601 let mut version_sst_ids = None;
602 if let Some(version) = version {
603 version_sst_ids = Some(
605 version
606 .get_sst_infos(true)
607 .filter_map(|s| {
608 if s.table_ids
609 .iter()
610 .any(|tid| time_travel_table_ids.contains(tid))
611 {
612 return Some(s.sst_id);
613 }
614 None
615 })
616 .collect(),
617 );
618 write_sstable_infos(
619 version.get_sst_infos(true).filter(|s| {
620 !skip_sst_ids.contains(&s.sst_id)
621 && s.table_ids
622 .iter()
623 .any(|tid| time_travel_table_ids.contains(tid))
624 }),
625 txn,
626 self.env.opts.hummock_time_travel_sst_info_insert_batch_size,
627 )
628 .await?;
629 let m = hummock_time_travel_version::ActiveModel {
630 version_id: Set(risingwave_meta_model::HummockVersionId::try_from(
631 version.id.to_u64(),
632 )
633 .unwrap()),
634 version: Set(
635 (&IncompleteHummockVersion::from((version, &time_travel_table_ids))
636 .to_protobuf())
637 .into(),
638 ),
639 };
640 hummock_time_travel_version::Entity::insert(m)
641 .on_conflict_do_nothing()
642 .exec(txn)
643 .await?;
644 return Ok(version_sst_ids);
646 }
647 let written = write_sstable_infos(
648 delta.newly_added_sst_infos(true).filter(|s| {
649 !skip_sst_ids.contains(&s.sst_id)
650 && s.table_ids
651 .iter()
652 .any(|tid| time_travel_table_ids.contains(tid))
653 }),
654 txn,
655 self.env.opts.hummock_time_travel_sst_info_insert_batch_size,
656 )
657 .await?;
658 if written > 0 {
660 let m = hummock_time_travel_delta::ActiveModel {
661 version_id: Set(risingwave_meta_model::HummockVersionId::try_from(
662 delta.id.to_u64(),
663 )
664 .unwrap()),
665 version_delta: Set((&IncompleteHummockVersionDelta::from((
666 &delta,
667 &time_travel_table_ids,
668 ))
669 .to_protobuf())
670 .into()),
671 };
672 hummock_time_travel_delta::Entity::insert(m)
673 .on_conflict_do_nothing()
674 .exec(txn)
675 .await?;
676 }
677
678 Ok(version_sst_ids)
679 }
680}
681
682fn replay_archive(
684 version: PbHummockVersion,
685 deltas: impl Iterator<Item = PbHummockVersionDelta>,
686) -> HummockVersion {
687 let mut last_version = HummockVersion::from_persisted_protobuf(&version);
690 for d in deltas {
691 let d = HummockVersionDelta::from_persisted_protobuf(&d);
692 debug_assert!(
693 !should_mark_next_time_travel_version_snapshot(&d),
694 "unexpected time travel delta {:?}",
695 d
696 );
697 while last_version.id < d.prev_id {
700 last_version.id = last_version.id + 1;
701 }
702 last_version.apply_version_delta(&d);
703 }
704 last_version
705}
706
707pub fn require_sql_meta_store_err() -> Error {
708 Error::TimeTravel(anyhow!("require SQL meta store"))
709}
710
711pub fn should_mark_next_time_travel_version_snapshot(delta: &HummockVersionDelta) -> bool {
713 delta.group_deltas.iter().any(|(_, deltas)| {
714 deltas
715 .group_deltas
716 .iter()
717 .any(|d| !matches!(d, GroupDeltaCommon::NewL0SubLevel(_)))
718 })
719}