risingwave_meta/hummock/manager/
time_travel.rs1use std::collections::{HashMap, HashSet, VecDeque};
16
17use anyhow::anyhow;
18use futures::TryStreamExt;
19use risingwave_common::catalog::TableId;
20use risingwave_common::system_param::reader::SystemParamsRead;
21use risingwave_common::util::epoch::Epoch;
22use risingwave_hummock_sdk::compaction_group::StateTableId;
23use risingwave_hummock_sdk::sstable_info::SstableInfo;
24use risingwave_hummock_sdk::time_travel::{
25 IncompleteHummockVersion, IncompleteHummockVersionDelta, refill_version,
26};
27use risingwave_hummock_sdk::version::{GroupDeltaCommon, HummockVersion, HummockVersionDelta};
28use risingwave_hummock_sdk::{
29 CompactionGroupId, HummockEpoch, HummockObjectId, HummockSstableId, HummockSstableObjectId,
30};
31use risingwave_meta_model::hummock_sstable_info::SstableInfoV2Backend;
32use risingwave_meta_model::{
33 HummockVersionId, hummock_epoch_to_version, hummock_sstable_info, hummock_time_travel_delta,
34 hummock_time_travel_version,
35};
36use risingwave_pb::hummock::{PbHummockVersion, PbHummockVersionDelta};
37use sea_orm::ActiveValue::Set;
38use sea_orm::{
39 ColumnTrait, Condition, DatabaseTransaction, EntityTrait, PaginatorTrait, QueryFilter,
40 QueryOrder, QuerySelect, TransactionTrait,
41};
42
43use crate::hummock::HummockManager;
44use crate::hummock::error::{Error, Result};
45
46impl HummockManager {
48 pub(crate) async fn init_time_travel_state(&self) -> Result<()> {
49 let sql_store = self.env.meta_store_ref();
50 let mut guard = self.versioning.write().await;
51 guard.mark_next_time_travel_version_snapshot();
52
53 guard.last_time_travel_snapshot_sst_ids = HashSet::new();
54 let Some(version) = hummock_time_travel_version::Entity::find()
55 .order_by_desc(hummock_time_travel_version::Column::VersionId)
56 .one(&sql_store.conn)
57 .await?
58 .map(|v| IncompleteHummockVersion::from_persisted_protobuf(&v.version.to_protobuf()))
59 else {
60 return Ok(());
61 };
62 guard.last_time_travel_snapshot_sst_ids = version.get_sst_ids(true);
63 Ok(())
64 }
65
66 pub(crate) async fn truncate_time_travel_metadata(
67 &self,
68 epoch_watermark: HummockEpoch,
69 ) -> Result<()> {
70 let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id();
71 let sql_store = self.env.meta_store_ref();
72 let txn = sql_store.conn.begin().await?;
73 let version_watermark = hummock_epoch_to_version::Entity::find()
74 .filter(
75 hummock_epoch_to_version::Column::Epoch
76 .lt(risingwave_meta_model::Epoch::try_from(epoch_watermark).unwrap()),
77 )
78 .order_by_desc(hummock_epoch_to_version::Column::Epoch)
79 .order_by_asc(hummock_epoch_to_version::Column::VersionId)
80 .one(&txn)
81 .await?;
82 let Some(version_watermark) = version_watermark else {
83 txn.commit().await?;
84 return Ok(());
85 };
86 let watermark_version_id = std::cmp::min(
87 version_watermark.version_id,
88 min_pinned_version_id.to_u64().try_into().unwrap(),
89 );
90 let res = hummock_epoch_to_version::Entity::delete_many()
91 .filter(
92 hummock_epoch_to_version::Column::Epoch
93 .lt(risingwave_meta_model::Epoch::try_from(epoch_watermark).unwrap()),
94 )
95 .exec(&txn)
96 .await?;
97 tracing::debug!(
98 epoch_watermark,
99 "delete {} rows from hummock_epoch_to_version",
100 res.rows_affected
101 );
102 let latest_valid_version = hummock_time_travel_version::Entity::find()
103 .filter(hummock_time_travel_version::Column::VersionId.lte(watermark_version_id))
104 .order_by_desc(hummock_time_travel_version::Column::VersionId)
105 .one(&txn)
106 .await?
107 .map(|m| IncompleteHummockVersion::from_persisted_protobuf(&m.version.to_protobuf()));
108 let Some(latest_valid_version) = latest_valid_version else {
109 txn.commit().await?;
110 return Ok(());
111 };
112 let (
113 latest_valid_version_id,
114 latest_valid_version_sst_ids,
115 latest_valid_version_object_ids,
116 ) = {
117 (
118 latest_valid_version.id,
119 latest_valid_version.get_sst_ids(true),
120 latest_valid_version.get_object_ids(true),
121 )
122 };
123 let mut object_ids_to_delete: HashSet<_> = HashSet::default();
124 let version_ids_to_delete: Vec<risingwave_meta_model::HummockVersionId> =
125 hummock_time_travel_version::Entity::find()
126 .select_only()
127 .column(hummock_time_travel_version::Column::VersionId)
128 .filter(
129 hummock_time_travel_version::Column::VersionId
130 .lt(latest_valid_version_id.to_u64()),
131 )
132 .order_by_desc(hummock_time_travel_version::Column::VersionId)
133 .into_tuple()
134 .all(&txn)
135 .await?;
136 let delta_ids_to_delete: Vec<risingwave_meta_model::HummockVersionId> =
137 hummock_time_travel_delta::Entity::find()
138 .select_only()
139 .column(hummock_time_travel_delta::Column::VersionId)
140 .filter(
141 hummock_time_travel_delta::Column::VersionId
142 .lt(latest_valid_version_id.to_u64()),
143 )
144 .into_tuple()
145 .all(&txn)
146 .await?;
147 let delete_sst_batch_size = self
149 .env
150 .opts
151 .hummock_time_travel_epoch_version_insert_batch_size;
152 let mut sst_ids_to_delete: HashSet<_> = HashSet::default();
153 async fn delete_sst_in_batch(
154 txn: &DatabaseTransaction,
155 sst_ids_to_delete: HashSet<HummockSstableId>,
156 delete_sst_batch_size: usize,
157 ) -> Result<()> {
158 for start_idx in 0..=(sst_ids_to_delete.len().saturating_sub(1) / delete_sst_batch_size)
159 {
160 hummock_sstable_info::Entity::delete_many()
161 .filter(
162 hummock_sstable_info::Column::SstId.is_in(
163 sst_ids_to_delete
164 .iter()
165 .skip(start_idx * delete_sst_batch_size)
166 .take(delete_sst_batch_size)
167 .map(|sst_id| sst_id.inner()),
168 ),
169 )
170 .exec(txn)
171 .await?;
172 }
173 Ok(())
174 }
175 for delta_id_to_delete in delta_ids_to_delete {
176 let delta_to_delete = hummock_time_travel_delta::Entity::find_by_id(delta_id_to_delete)
177 .one(&txn)
178 .await?
179 .ok_or_else(|| {
180 Error::TimeTravel(anyhow!(format!(
181 "version delta {} not found",
182 delta_id_to_delete
183 )))
184 })?;
185 let delta_to_delete = IncompleteHummockVersionDelta::from_persisted_protobuf(
186 &delta_to_delete.version_delta.to_protobuf(),
187 );
188 let new_sst_ids = delta_to_delete.newly_added_sst_ids(true);
189 sst_ids_to_delete.extend(&new_sst_ids - &latest_valid_version_sst_ids);
191 if sst_ids_to_delete.len() >= delete_sst_batch_size {
192 delete_sst_in_batch(
193 &txn,
194 std::mem::take(&mut sst_ids_to_delete),
195 delete_sst_batch_size,
196 )
197 .await?;
198 }
199 let new_object_ids = delta_to_delete.newly_added_object_ids(true);
200 object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
201 }
202 let mut next_version_sst_ids = latest_valid_version_sst_ids;
203 for prev_version_id in version_ids_to_delete {
204 let prev_version = {
205 let prev_version = hummock_time_travel_version::Entity::find_by_id(prev_version_id)
206 .one(&txn)
207 .await?
208 .ok_or_else(|| {
209 Error::TimeTravel(anyhow!(format!(
210 "prev_version {} not found",
211 prev_version_id
212 )))
213 })?;
214 IncompleteHummockVersion::from_persisted_protobuf(
215 &prev_version.version.to_protobuf(),
216 )
217 };
218 let sst_ids = prev_version.get_sst_ids(true);
219 sst_ids_to_delete.extend(&sst_ids - &next_version_sst_ids);
221 if sst_ids_to_delete.len() >= delete_sst_batch_size {
222 delete_sst_in_batch(
223 &txn,
224 std::mem::take(&mut sst_ids_to_delete),
225 delete_sst_batch_size,
226 )
227 .await?;
228 }
229 let new_object_ids = prev_version.get_object_ids(true);
230 object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
231 next_version_sst_ids = sst_ids;
232 }
233 if !sst_ids_to_delete.is_empty() {
234 delete_sst_in_batch(&txn, sst_ids_to_delete, delete_sst_batch_size).await?;
235 }
236
237 if !object_ids_to_delete.is_empty() {
238 self.gc_manager
241 .add_may_delete_object_ids(object_ids_to_delete.into_iter());
242 }
243
244 let res = hummock_time_travel_version::Entity::delete_many()
245 .filter(
246 hummock_time_travel_version::Column::VersionId.lt(latest_valid_version_id.to_u64()),
247 )
248 .exec(&txn)
249 .await?;
250 tracing::debug!(
251 epoch_watermark_version_id = ?watermark_version_id,
252 ?latest_valid_version_id,
253 "delete {} rows from hummock_time_travel_version",
254 res.rows_affected
255 );
256
257 let res = hummock_time_travel_delta::Entity::delete_many()
258 .filter(
259 hummock_time_travel_delta::Column::VersionId.lt(latest_valid_version_id.to_u64()),
260 )
261 .exec(&txn)
262 .await?;
263 tracing::debug!(
264 epoch_watermark_version_id = ?watermark_version_id,
265 ?latest_valid_version_id,
266 "delete {} rows from hummock_time_travel_delta",
267 res.rows_affected
268 );
269
270 txn.commit().await?;
271 Ok(())
272 }
273
274 pub(crate) async fn filter_out_objects_by_time_travel_v1(
275 &self,
276 objects: impl Iterator<Item = HummockObjectId>,
277 ) -> Result<HashSet<HummockObjectId>> {
278 let batch_size = self
279 .env
280 .opts
281 .hummock_time_travel_filter_out_objects_batch_size;
282 let mut result: HashSet<_> = objects.collect();
285 let mut remain_sst: VecDeque<_> = result
286 .iter()
287 .map(|object_id| {
288 let HummockObjectId::Sstable(sst_id) = object_id;
289 *sst_id
290 })
291 .collect();
292 while !remain_sst.is_empty() {
293 let batch = remain_sst
294 .drain(..std::cmp::min(remain_sst.len(), batch_size))
295 .map(|object_id| object_id.as_raw().inner());
296 let reject_object_ids: Vec<risingwave_meta_model::HummockSstableObjectId> =
297 hummock_sstable_info::Entity::find()
298 .filter(hummock_sstable_info::Column::ObjectId.is_in(batch))
299 .select_only()
300 .column(hummock_sstable_info::Column::ObjectId)
301 .into_tuple()
302 .all(&self.env.meta_store_ref().conn)
303 .await?;
304 for reject in reject_object_ids {
305 match HummockObjectId::Sstable(0.into()) {
309 HummockObjectId::Sstable(_) => {}
310 };
311 let reject: u64 = reject.try_into().unwrap();
312 let object_id = HummockObjectId::Sstable(HummockSstableObjectId::from(reject));
313 result.remove(&object_id);
314 }
315 }
316 Ok(result)
317 }
318
319 pub(crate) async fn filter_out_objects_by_time_travel(
320 &self,
321 objects: impl Iterator<Item = HummockObjectId>,
322 ) -> Result<HashSet<HummockObjectId>> {
323 if self.env.opts.hummock_time_travel_filter_out_objects_v1 {
324 return self.filter_out_objects_by_time_travel_v1(objects).await;
325 }
326 let mut result: HashSet<_> = objects.collect();
327
328 {
330 let mut prev_version_id: Option<HummockVersionId> = None;
331 loop {
332 let query = hummock_time_travel_version::Entity::find();
333 let query = if let Some(prev_version_id) = prev_version_id {
334 query.filter(hummock_time_travel_version::Column::VersionId.gt(prev_version_id))
335 } else {
336 query
337 };
338 let mut version_stream = query
339 .order_by_asc(hummock_time_travel_version::Column::VersionId)
340 .limit(
341 self.env
342 .opts
343 .hummock_time_travel_filter_out_objects_list_version_batch_size
344 as u64,
345 )
346 .stream(&self.env.meta_store_ref().conn)
347 .await?;
348 let mut next_prev_version_id = None;
349 while let Some(model) = version_stream.try_next().await? {
350 let version =
351 HummockVersion::from_persisted_protobuf(&model.version.to_protobuf());
352 for object_id in version.get_object_ids(true) {
353 result.remove(&object_id);
354 }
355 next_prev_version_id = Some(model.version_id);
356 }
357 if let Some(next_prev_version_id) = next_prev_version_id {
358 prev_version_id = Some(next_prev_version_id);
359 } else {
360 break;
361 }
362 }
363 }
364
365 {
367 let mut prev_version_id: Option<HummockVersionId> = None;
368 loop {
369 let query = hummock_time_travel_delta::Entity::find();
370 let query = if let Some(prev_version_id) = prev_version_id {
371 query.filter(hummock_time_travel_delta::Column::VersionId.gt(prev_version_id))
372 } else {
373 query
374 };
375 let mut version_stream = query
376 .order_by_asc(hummock_time_travel_delta::Column::VersionId)
377 .limit(
378 self.env
379 .opts
380 .hummock_time_travel_filter_out_objects_list_delta_batch_size
381 as u64,
382 )
383 .stream(&self.env.meta_store_ref().conn)
384 .await?;
385 let mut next_prev_version_id = None;
386 while let Some(model) = version_stream.try_next().await? {
387 let version_delta = HummockVersionDelta::from_persisted_protobuf(
388 &model.version_delta.to_protobuf(),
389 );
390 for object_id in version_delta.newly_added_object_ids(true) {
392 result.remove(&object_id);
393 }
394 next_prev_version_id = Some(model.version_id);
395 }
396 if let Some(next_prev_version_id) = next_prev_version_id {
397 prev_version_id = Some(next_prev_version_id);
398 } else {
399 break;
400 }
401 }
402 }
403
404 Ok(result)
405 }
406
407 pub(crate) async fn time_travel_pinned_object_count(&self) -> Result<u64> {
408 let count = hummock_sstable_info::Entity::find()
409 .count(&self.env.meta_store_ref().conn)
410 .await?;
411 Ok(count)
412 }
413
414 pub async fn epoch_to_version(
420 &self,
421 query_epoch: HummockEpoch,
422 table_id: u32,
423 ) -> Result<HummockVersion> {
424 let sql_store = self.env.meta_store_ref();
425 let _permit = self.inflight_time_travel_query.try_acquire().map_err(|_| {
426 anyhow!(format!(
427 "too many inflight time travel queries, max_inflight_time_travel_query={}",
428 self.env.opts.max_inflight_time_travel_query
429 ))
430 })?;
431 let epoch_to_version = hummock_epoch_to_version::Entity::find()
432 .filter(
433 Condition::any()
434 .add(hummock_epoch_to_version::Column::TableId.eq(i64::from(table_id)))
435 .add(hummock_epoch_to_version::Column::TableId.eq(0)),
437 )
438 .filter(
439 hummock_epoch_to_version::Column::Epoch
440 .lte(risingwave_meta_model::Epoch::try_from(query_epoch).unwrap()),
441 )
442 .order_by_desc(hummock_epoch_to_version::Column::Epoch)
443 .one(&sql_store.conn)
444 .await?
445 .ok_or_else(|| {
446 Error::TimeTravel(anyhow!(format!(
447 "version not found for epoch {}",
448 query_epoch
449 )))
450 })?;
451 let timer = self
452 .metrics
453 .time_travel_version_replay_latency
454 .start_timer();
455 let actual_version_id = epoch_to_version.version_id;
456 tracing::debug!(
457 query_epoch,
458 query_tz = ?(Epoch(query_epoch).as_timestamptz()),
459 actual_epoch = epoch_to_version.epoch,
460 actual_tz = ?(Epoch(u64::try_from(epoch_to_version.epoch).unwrap()).as_timestamptz()),
461 actual_version_id,
462 "convert query epoch"
463 );
464
465 let replay_version = hummock_time_travel_version::Entity::find()
466 .filter(hummock_time_travel_version::Column::VersionId.lte(actual_version_id))
467 .order_by_desc(hummock_time_travel_version::Column::VersionId)
468 .one(&sql_store.conn)
469 .await?
470 .ok_or_else(|| {
471 Error::TimeTravel(anyhow!(format!(
472 "no replay version found for epoch {}, version {}",
473 query_epoch, actual_version_id,
474 )))
475 })?;
476 let deltas = hummock_time_travel_delta::Entity::find()
477 .filter(hummock_time_travel_delta::Column::VersionId.gt(replay_version.version_id))
478 .filter(hummock_time_travel_delta::Column::VersionId.lte(actual_version_id))
479 .order_by_asc(hummock_time_travel_delta::Column::VersionId)
480 .all(&sql_store.conn)
481 .await?;
482 let mut actual_version = replay_archive(
484 replay_version.version.to_protobuf(),
485 deltas.into_iter().map(|d| d.version_delta.to_protobuf()),
486 );
487
488 let mut sst_ids = actual_version
489 .get_sst_ids(true)
490 .into_iter()
491 .collect::<VecDeque<_>>();
492 let sst_count = sst_ids.len();
493 let mut sst_id_to_info = HashMap::with_capacity(sst_count);
494 let sst_info_fetch_batch_size = self.env.opts.hummock_time_travel_sst_info_fetch_batch_size;
495 while !sst_ids.is_empty() {
496 let sst_infos = hummock_sstable_info::Entity::find()
497 .filter(
498 hummock_sstable_info::Column::SstId.is_in(
499 sst_ids
500 .drain(..std::cmp::min(sst_info_fetch_batch_size, sst_ids.len()))
501 .map(|sst_id| sst_id.inner()),
502 ),
503 )
504 .all(&sql_store.conn)
505 .await?;
506 for sst_info in sst_infos {
507 let sst_info: SstableInfo = sst_info.sstable_info.to_protobuf().into();
508 sst_id_to_info.insert(sst_info.sst_id, sst_info);
509 }
510 }
511 if sst_count != sst_id_to_info.len() {
512 return Err(Error::TimeTravel(anyhow!(format!(
513 "some SstableInfos not found for epoch {}, version {}",
514 query_epoch, actual_version_id,
515 ))));
516 }
517 refill_version(&mut actual_version, &sst_id_to_info, table_id);
518 timer.observe_duration();
519 Ok(actual_version)
520 }
521
522 pub(crate) async fn write_time_travel_metadata(
523 &self,
524 txn: &DatabaseTransaction,
525 version: Option<&HummockVersion>,
526 delta: HummockVersionDelta,
527 time_travel_table_ids: HashSet<StateTableId>,
528 skip_sst_ids: &HashSet<HummockSstableId>,
529 tables_to_commit: impl Iterator<Item = (&TableId, &CompactionGroupId, u64)>,
530 ) -> Result<Option<HashSet<HummockSstableId>>> {
531 if self
532 .env
533 .system_params_reader()
534 .await
535 .time_travel_retention_ms()
536 == 0
537 {
538 return Ok(None);
539 }
540 async fn write_sstable_infos(
541 mut sst_infos: impl Iterator<Item = &SstableInfo>,
542 txn: &DatabaseTransaction,
543 batch_size: usize,
544 ) -> Result<usize> {
545 let mut count = 0;
546 let mut is_finished = false;
547 while !is_finished {
548 let mut remain = batch_size;
549 let mut batch = vec![];
550 while remain > 0 {
551 let Some(sst_info) = sst_infos.next() else {
552 is_finished = true;
553 break;
554 };
555 batch.push(hummock_sstable_info::ActiveModel {
556 sst_id: Set(sst_info.sst_id.inner().try_into().unwrap()),
557 object_id: Set(sst_info.object_id.inner().try_into().unwrap()),
558 sstable_info: Set(SstableInfoV2Backend::from(&sst_info.to_protobuf())),
559 });
560 remain -= 1;
561 count += 1;
562 }
563 if batch.is_empty() {
564 break;
565 }
566 hummock_sstable_info::Entity::insert_many(batch)
567 .on_conflict_do_nothing()
568 .exec(txn)
569 .await?;
570 }
571 Ok(count)
572 }
573
574 let mut batch = vec![];
575 for (table_id, _cg_id, committed_epoch) in tables_to_commit {
576 let version_id: u64 = delta.id.to_u64();
577 let m = hummock_epoch_to_version::ActiveModel {
578 epoch: Set(committed_epoch.try_into().unwrap()),
579 table_id: Set(table_id.table_id.into()),
580 version_id: Set(version_id.try_into().unwrap()),
581 };
582 batch.push(m);
583 if batch.len()
584 >= self
585 .env
586 .opts
587 .hummock_time_travel_epoch_version_insert_batch_size
588 {
589 hummock_epoch_to_version::Entity::insert_many(std::mem::take(&mut batch))
591 .do_nothing()
592 .exec(txn)
593 .await?;
594 }
595 }
596 if !batch.is_empty() {
597 hummock_epoch_to_version::Entity::insert_many(batch)
599 .do_nothing()
600 .exec(txn)
601 .await?;
602 }
603
604 let mut version_sst_ids = None;
605 if let Some(version) = version {
606 version_sst_ids = Some(
608 version
609 .get_sst_infos(true)
610 .filter_map(|s| {
611 if s.table_ids
612 .iter()
613 .any(|tid| time_travel_table_ids.contains(tid))
614 {
615 return Some(s.sst_id);
616 }
617 None
618 })
619 .collect(),
620 );
621 write_sstable_infos(
622 version.get_sst_infos(true).filter(|s| {
623 !skip_sst_ids.contains(&s.sst_id)
624 && s.table_ids
625 .iter()
626 .any(|tid| time_travel_table_ids.contains(tid))
627 }),
628 txn,
629 self.env.opts.hummock_time_travel_sst_info_insert_batch_size,
630 )
631 .await?;
632 let m = hummock_time_travel_version::ActiveModel {
633 version_id: Set(risingwave_meta_model::HummockVersionId::try_from(
634 version.id.to_u64(),
635 )
636 .unwrap()),
637 version: Set(
638 (&IncompleteHummockVersion::from((version, &time_travel_table_ids))
639 .to_protobuf())
640 .into(),
641 ),
642 };
643 hummock_time_travel_version::Entity::insert(m)
644 .on_conflict_do_nothing()
645 .exec(txn)
646 .await?;
647 return Ok(version_sst_ids);
649 }
650 let written = write_sstable_infos(
651 delta.newly_added_sst_infos(true).filter(|s| {
652 !skip_sst_ids.contains(&s.sst_id)
653 && s.table_ids
654 .iter()
655 .any(|tid| time_travel_table_ids.contains(tid))
656 }),
657 txn,
658 self.env.opts.hummock_time_travel_sst_info_insert_batch_size,
659 )
660 .await?;
661 if written > 0 {
663 let m = hummock_time_travel_delta::ActiveModel {
664 version_id: Set(risingwave_meta_model::HummockVersionId::try_from(
665 delta.id.to_u64(),
666 )
667 .unwrap()),
668 version_delta: Set((&IncompleteHummockVersionDelta::from((
669 &delta,
670 &time_travel_table_ids,
671 ))
672 .to_protobuf())
673 .into()),
674 };
675 hummock_time_travel_delta::Entity::insert(m)
676 .on_conflict_do_nothing()
677 .exec(txn)
678 .await?;
679 }
680
681 Ok(version_sst_ids)
682 }
683}
684
685fn replay_archive(
687 version: PbHummockVersion,
688 deltas: impl Iterator<Item = PbHummockVersionDelta>,
689) -> HummockVersion {
690 let mut last_version = HummockVersion::from_persisted_protobuf(&version);
693 for d in deltas {
694 let d = HummockVersionDelta::from_persisted_protobuf(&d);
695 debug_assert!(
696 !should_mark_next_time_travel_version_snapshot(&d),
697 "unexpected time travel delta {:?}",
698 d
699 );
700 while last_version.id < d.prev_id {
703 last_version.id = last_version.id + 1;
704 }
705 last_version.apply_version_delta(&d);
706 }
707 last_version
708}
709
710pub fn require_sql_meta_store_err() -> Error {
711 Error::TimeTravel(anyhow!("require SQL meta store"))
712}
713
714pub fn should_mark_next_time_travel_version_snapshot(delta: &HummockVersionDelta) -> bool {
716 delta.group_deltas.iter().any(|(_, deltas)| {
717 deltas
718 .group_deltas
719 .iter()
720 .any(|d| !matches!(d, GroupDeltaCommon::NewL0SubLevel(_)))
721 })
722}