risingwave_meta/hummock/manager/
time_travel.rs1use std::collections::{HashMap, HashSet, VecDeque};
16
17use anyhow::anyhow;
18use futures::TryStreamExt;
19use risingwave_common::catalog::TableId;
20use risingwave_common::system_param::reader::SystemParamsRead;
21use risingwave_common::util::epoch::Epoch;
22use risingwave_hummock_sdk::compaction_group::StateTableId;
23use risingwave_hummock_sdk::sstable_info::SstableInfo;
24use risingwave_hummock_sdk::time_travel::{
25 IncompleteHummockVersion, IncompleteHummockVersionDelta, refill_version,
26};
27use risingwave_hummock_sdk::version::{GroupDeltaCommon, HummockVersion, HummockVersionDelta};
28use risingwave_hummock_sdk::{
29 CompactionGroupId, HummockEpoch, HummockObjectId, HummockSstableId, HummockSstableObjectId,
30};
31use risingwave_meta_model::hummock_sstable_info::SstableInfoV2Backend;
32use risingwave_meta_model::{
33 HummockVersionId, hummock_epoch_to_version, hummock_sstable_info, hummock_time_travel_delta,
34 hummock_time_travel_version,
35};
36use risingwave_pb::hummock::{PbHummockVersion, PbHummockVersionDelta};
37use sea_orm::ActiveValue::Set;
38use sea_orm::{
39 ColumnTrait, Condition, DatabaseTransaction, EntityTrait, PaginatorTrait, QueryFilter,
40 QueryOrder, QuerySelect, TransactionTrait,
41};
42use tracing::info;
43
44use crate::hummock::HummockManager;
45use crate::hummock::error::{Error, Result};
46
47impl HummockManager {
49 pub(crate) async fn init_time_travel_state(&self) -> Result<()> {
50 let sql_store = self.env.meta_store_ref();
51 let mut guard = self.versioning.write().await;
52 guard.mark_next_time_travel_version_snapshot();
53
54 guard.last_time_travel_snapshot_sst_ids = HashSet::new();
55 let Some(version) = hummock_time_travel_version::Entity::find()
56 .order_by_desc(hummock_time_travel_version::Column::VersionId)
57 .one(&sql_store.conn)
58 .await?
59 .map(|v| IncompleteHummockVersion::from_persisted_protobuf(&v.version.to_protobuf()))
60 else {
61 return Ok(());
62 };
63 guard.last_time_travel_snapshot_sst_ids = version.get_sst_ids(true);
64 Ok(())
65 }
66
67 pub(crate) async fn truncate_time_travel_metadata(
68 &self,
69 epoch_watermark: HummockEpoch,
70 ) -> Result<()> {
71 let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id();
72 let sql_store = self.env.meta_store_ref();
73 let txn = sql_store.conn.begin().await?;
74 let version_watermark = hummock_epoch_to_version::Entity::find()
75 .filter(
76 hummock_epoch_to_version::Column::Epoch
77 .lt(risingwave_meta_model::Epoch::try_from(epoch_watermark).unwrap()),
78 )
79 .order_by_desc(hummock_epoch_to_version::Column::Epoch)
80 .order_by_asc(hummock_epoch_to_version::Column::VersionId)
81 .one(&txn)
82 .await?;
83 let Some(version_watermark) = version_watermark else {
84 txn.commit().await?;
85 return Ok(());
86 };
87 let watermark_version_id = std::cmp::min(
88 version_watermark.version_id,
89 min_pinned_version_id.to_u64().try_into().unwrap(),
90 );
91 let res = hummock_epoch_to_version::Entity::delete_many()
92 .filter(
93 hummock_epoch_to_version::Column::Epoch
94 .lt(risingwave_meta_model::Epoch::try_from(epoch_watermark).unwrap()),
95 )
96 .exec(&txn)
97 .await?;
98 tracing::debug!(
99 epoch_watermark,
100 "delete {} rows from hummock_epoch_to_version",
101 res.rows_affected
102 );
103 let latest_valid_version = hummock_time_travel_version::Entity::find()
104 .filter(hummock_time_travel_version::Column::VersionId.lte(watermark_version_id))
105 .order_by_desc(hummock_time_travel_version::Column::VersionId)
106 .one(&txn)
107 .await?
108 .map(|m| IncompleteHummockVersion::from_persisted_protobuf(&m.version.to_protobuf()));
109 let Some(latest_valid_version) = latest_valid_version else {
110 txn.commit().await?;
111 return Ok(());
112 };
113 let (
114 latest_valid_version_id,
115 latest_valid_version_sst_ids,
116 latest_valid_version_object_ids,
117 ) = {
118 (
119 latest_valid_version.id,
120 latest_valid_version.get_sst_ids(true),
121 latest_valid_version
122 .get_object_ids(true)
123 .collect::<HashSet<_>>(),
124 )
125 };
126 let mut object_ids_to_delete: HashSet<_> = HashSet::default();
127 let version_ids_to_delete: Vec<risingwave_meta_model::HummockVersionId> =
128 hummock_time_travel_version::Entity::find()
129 .select_only()
130 .column(hummock_time_travel_version::Column::VersionId)
131 .filter(
132 hummock_time_travel_version::Column::VersionId
133 .lt(latest_valid_version_id.to_u64()),
134 )
135 .order_by_desc(hummock_time_travel_version::Column::VersionId)
136 .into_tuple()
137 .all(&txn)
138 .await?;
139 let delta_ids_to_delete: Vec<risingwave_meta_model::HummockVersionId> =
140 hummock_time_travel_delta::Entity::find()
141 .select_only()
142 .column(hummock_time_travel_delta::Column::VersionId)
143 .filter(
144 hummock_time_travel_delta::Column::VersionId
145 .lt(latest_valid_version_id.to_u64()),
146 )
147 .into_tuple()
148 .all(&txn)
149 .await?;
150 let delete_sst_batch_size = self
152 .env
153 .opts
154 .hummock_time_travel_epoch_version_insert_batch_size;
155 let mut sst_ids_to_delete: HashSet<_> = HashSet::default();
156 async fn delete_sst_in_batch(
157 txn: &DatabaseTransaction,
158 sst_ids_to_delete: HashSet<HummockSstableId>,
159 delete_sst_batch_size: usize,
160 ) -> Result<()> {
161 for start_idx in 0..=(sst_ids_to_delete.len().saturating_sub(1) / delete_sst_batch_size)
162 {
163 hummock_sstable_info::Entity::delete_many()
164 .filter(
165 hummock_sstable_info::Column::SstId.is_in(
166 sst_ids_to_delete
167 .iter()
168 .skip(start_idx * delete_sst_batch_size)
169 .take(delete_sst_batch_size)
170 .map(|sst_id| sst_id.inner()),
171 ),
172 )
173 .exec(txn)
174 .await?;
175 }
176 Ok(())
177 }
178 for delta_id_to_delete in delta_ids_to_delete {
179 let delta_to_delete = hummock_time_travel_delta::Entity::find_by_id(delta_id_to_delete)
180 .one(&txn)
181 .await?
182 .ok_or_else(|| {
183 Error::TimeTravel(anyhow!(format!(
184 "version delta {} not found",
185 delta_id_to_delete
186 )))
187 })?;
188 let delta_to_delete = IncompleteHummockVersionDelta::from_persisted_protobuf(
189 &delta_to_delete.version_delta.to_protobuf(),
190 );
191 let new_sst_ids = delta_to_delete.newly_added_sst_ids(true);
192 sst_ids_to_delete.extend(&new_sst_ids - &latest_valid_version_sst_ids);
194 if sst_ids_to_delete.len() >= delete_sst_batch_size {
195 delete_sst_in_batch(
196 &txn,
197 std::mem::take(&mut sst_ids_to_delete),
198 delete_sst_batch_size,
199 )
200 .await?;
201 }
202 let new_object_ids = delta_to_delete.newly_added_object_ids(true);
203 object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
204 }
205 let mut next_version_sst_ids = latest_valid_version_sst_ids;
206 for prev_version_id in version_ids_to_delete {
207 let prev_version = {
208 let prev_version = hummock_time_travel_version::Entity::find_by_id(prev_version_id)
209 .one(&txn)
210 .await?
211 .ok_or_else(|| {
212 Error::TimeTravel(anyhow!(format!(
213 "prev_version {} not found",
214 prev_version_id
215 )))
216 })?;
217 IncompleteHummockVersion::from_persisted_protobuf(
218 &prev_version.version.to_protobuf(),
219 )
220 };
221 let sst_ids = prev_version.get_sst_ids(true);
222 sst_ids_to_delete.extend(&sst_ids - &next_version_sst_ids);
224 if sst_ids_to_delete.len() >= delete_sst_batch_size {
225 delete_sst_in_batch(
226 &txn,
227 std::mem::take(&mut sst_ids_to_delete),
228 delete_sst_batch_size,
229 )
230 .await?;
231 }
232 let new_object_ids: HashSet<_> = prev_version.get_object_ids(true).collect();
233 object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
234 next_version_sst_ids = sst_ids;
235 }
236 if !sst_ids_to_delete.is_empty() {
237 delete_sst_in_batch(&txn, sst_ids_to_delete, delete_sst_batch_size).await?;
238 }
239
240 if !object_ids_to_delete.is_empty() {
241 self.gc_manager
244 .add_may_delete_object_ids(object_ids_to_delete.into_iter());
245 }
246
247 let res = hummock_time_travel_version::Entity::delete_many()
248 .filter(
249 hummock_time_travel_version::Column::VersionId.lt(latest_valid_version_id.to_u64()),
250 )
251 .exec(&txn)
252 .await?;
253 tracing::debug!(
254 epoch_watermark_version_id = ?watermark_version_id,
255 ?latest_valid_version_id,
256 "delete {} rows from hummock_time_travel_version",
257 res.rows_affected
258 );
259
260 let res = hummock_time_travel_delta::Entity::delete_many()
261 .filter(
262 hummock_time_travel_delta::Column::VersionId.lt(latest_valid_version_id.to_u64()),
263 )
264 .exec(&txn)
265 .await?;
266 tracing::debug!(
267 epoch_watermark_version_id = ?watermark_version_id,
268 ?latest_valid_version_id,
269 "delete {} rows from hummock_time_travel_delta",
270 res.rows_affected
271 );
272
273 txn.commit().await?;
274 Ok(())
275 }
276
277 pub(crate) async fn filter_out_objects_by_time_travel_v1(
278 &self,
279 objects: impl Iterator<Item = HummockObjectId>,
280 ) -> Result<HashSet<HummockObjectId>> {
281 let batch_size = self
282 .env
283 .opts
284 .hummock_time_travel_filter_out_objects_batch_size;
285 info!("filter out objects by time travel v1, only sst will remain in the result set");
286 let mut result: HashSet<_> = objects
289 .filter(|object_id| match object_id {
290 HummockObjectId::Sstable(_) => true,
291 HummockObjectId::VectorFile(_) | HummockObjectId::HnswGraphFile(_) => false,
292 })
293 .collect();
294 let mut remain_sst: VecDeque<_> = result.iter().copied().collect();
295 while !remain_sst.is_empty() {
296 let batch = remain_sst
297 .drain(..std::cmp::min(remain_sst.len(), batch_size))
298 .map(|object_id| object_id.as_raw().inner());
299 let reject_object_ids: Vec<risingwave_meta_model::HummockSstableObjectId> =
300 hummock_sstable_info::Entity::find()
301 .filter(hummock_sstable_info::Column::ObjectId.is_in(batch))
302 .select_only()
303 .column(hummock_sstable_info::Column::ObjectId)
304 .into_tuple()
305 .all(&self.env.meta_store_ref().conn)
306 .await?;
307 for reject in reject_object_ids {
308 let reject: u64 = reject.try_into().unwrap();
309 let object_id = HummockObjectId::Sstable(HummockSstableObjectId::from(reject));
310 result.remove(&object_id);
311 }
312 }
313 Ok(result)
314 }
315
316 pub(crate) async fn filter_out_objects_by_time_travel(
317 &self,
318 objects: impl Iterator<Item = HummockObjectId>,
319 ) -> Result<HashSet<HummockObjectId>> {
320 if self.env.opts.hummock_time_travel_filter_out_objects_v1 {
321 return self.filter_out_objects_by_time_travel_v1(objects).await;
322 }
323 let mut result: HashSet<_> = objects.collect();
324
325 {
327 let mut prev_version_id: Option<HummockVersionId> = None;
328 loop {
329 let query = hummock_time_travel_version::Entity::find();
330 let query = if let Some(prev_version_id) = prev_version_id {
331 query.filter(hummock_time_travel_version::Column::VersionId.gt(prev_version_id))
332 } else {
333 query
334 };
335 let mut version_stream = query
336 .order_by_asc(hummock_time_travel_version::Column::VersionId)
337 .limit(
338 self.env
339 .opts
340 .hummock_time_travel_filter_out_objects_list_version_batch_size
341 as u64,
342 )
343 .stream(&self.env.meta_store_ref().conn)
344 .await?;
345 let mut next_prev_version_id = None;
346 while let Some(model) = version_stream.try_next().await? {
347 let version =
348 HummockVersion::from_persisted_protobuf(&model.version.to_protobuf());
349 for object_id in version.get_object_ids(true) {
350 result.remove(&object_id);
351 }
352 next_prev_version_id = Some(model.version_id);
353 }
354 if let Some(next_prev_version_id) = next_prev_version_id {
355 prev_version_id = Some(next_prev_version_id);
356 } else {
357 break;
358 }
359 }
360 }
361
362 {
364 let mut prev_version_id: Option<HummockVersionId> = None;
365 loop {
366 let query = hummock_time_travel_delta::Entity::find();
367 let query = if let Some(prev_version_id) = prev_version_id {
368 query.filter(hummock_time_travel_delta::Column::VersionId.gt(prev_version_id))
369 } else {
370 query
371 };
372 let mut version_stream = query
373 .order_by_asc(hummock_time_travel_delta::Column::VersionId)
374 .limit(
375 self.env
376 .opts
377 .hummock_time_travel_filter_out_objects_list_delta_batch_size
378 as u64,
379 )
380 .stream(&self.env.meta_store_ref().conn)
381 .await?;
382 let mut next_prev_version_id = None;
383 while let Some(model) = version_stream.try_next().await? {
384 let version_delta = HummockVersionDelta::from_persisted_protobuf(
385 &model.version_delta.to_protobuf(),
386 );
387 for object_id in version_delta.newly_added_object_ids(true) {
389 result.remove(&object_id);
390 }
391 next_prev_version_id = Some(model.version_id);
392 }
393 if let Some(next_prev_version_id) = next_prev_version_id {
394 prev_version_id = Some(next_prev_version_id);
395 } else {
396 break;
397 }
398 }
399 }
400
401 Ok(result)
402 }
403
404 pub(crate) async fn time_travel_pinned_object_count(&self) -> Result<u64> {
405 let count = hummock_sstable_info::Entity::find()
406 .count(&self.env.meta_store_ref().conn)
407 .await?;
408 Ok(count)
409 }
410
411 pub async fn epoch_to_version(
417 &self,
418 query_epoch: HummockEpoch,
419 table_id: TableId,
420 ) -> Result<HummockVersion> {
421 let sql_store = self.env.meta_store_ref();
422 let _permit = self.inflight_time_travel_query.try_acquire().map_err(|_| {
423 anyhow!(format!(
424 "too many inflight time travel queries, max_inflight_time_travel_query={}",
425 self.env.opts.max_inflight_time_travel_query
426 ))
427 })?;
428 let epoch_to_version = hummock_epoch_to_version::Entity::find()
429 .filter(
430 Condition::any()
431 .add(
432 hummock_epoch_to_version::Column::TableId
433 .eq(i64::from(table_id.as_raw_id())),
434 )
435 .add(hummock_epoch_to_version::Column::TableId.eq(0)),
437 )
438 .filter(
439 hummock_epoch_to_version::Column::Epoch
440 .lte(risingwave_meta_model::Epoch::try_from(query_epoch).unwrap()),
441 )
442 .order_by_desc(hummock_epoch_to_version::Column::Epoch)
443 .one(&sql_store.conn)
444 .await?
445 .ok_or_else(|| {
446 Error::TimeTravel(anyhow!(format!(
447 "version not found for epoch {}",
448 query_epoch
449 )))
450 })?;
451 let timer = self
452 .metrics
453 .time_travel_version_replay_latency
454 .start_timer();
455 let actual_version_id = epoch_to_version.version_id;
456 tracing::debug!(
457 query_epoch,
458 query_tz = ?(Epoch(query_epoch).as_timestamptz()),
459 actual_epoch = epoch_to_version.epoch,
460 actual_tz = ?(Epoch(u64::try_from(epoch_to_version.epoch).unwrap()).as_timestamptz()),
461 actual_version_id,
462 "convert query epoch"
463 );
464
465 let replay_version = hummock_time_travel_version::Entity::find()
466 .filter(hummock_time_travel_version::Column::VersionId.lte(actual_version_id))
467 .order_by_desc(hummock_time_travel_version::Column::VersionId)
468 .one(&sql_store.conn)
469 .await?
470 .ok_or_else(|| {
471 Error::TimeTravel(anyhow!(format!(
472 "no replay version found for epoch {}, version {}",
473 query_epoch, actual_version_id,
474 )))
475 })?;
476 let deltas = hummock_time_travel_delta::Entity::find()
477 .filter(hummock_time_travel_delta::Column::VersionId.gt(replay_version.version_id))
478 .filter(hummock_time_travel_delta::Column::VersionId.lte(actual_version_id))
479 .order_by_asc(hummock_time_travel_delta::Column::VersionId)
480 .all(&sql_store.conn)
481 .await?;
482 let mut actual_version = replay_archive(
484 replay_version.version.to_protobuf(),
485 deltas.into_iter().map(|d| d.version_delta.to_protobuf()),
486 );
487
488 let mut sst_ids = actual_version
489 .get_sst_ids(true)
490 .into_iter()
491 .collect::<VecDeque<_>>();
492 let sst_count = sst_ids.len();
493 let mut sst_id_to_info = HashMap::with_capacity(sst_count);
494 let sst_info_fetch_batch_size = self.env.opts.hummock_time_travel_sst_info_fetch_batch_size;
495 while !sst_ids.is_empty() {
496 let sst_infos = hummock_sstable_info::Entity::find()
497 .filter(
498 hummock_sstable_info::Column::SstId.is_in(
499 sst_ids
500 .drain(..std::cmp::min(sst_info_fetch_batch_size, sst_ids.len()))
501 .map(|sst_id| sst_id.inner()),
502 ),
503 )
504 .all(&sql_store.conn)
505 .await?;
506 for sst_info in sst_infos {
507 let sst_info: SstableInfo = sst_info.sstable_info.to_protobuf().into();
508 sst_id_to_info.insert(sst_info.sst_id, sst_info);
509 }
510 }
511 if sst_count != sst_id_to_info.len() {
512 return Err(Error::TimeTravel(anyhow!(format!(
513 "some SstableInfos not found for epoch {}, version {}",
514 query_epoch, actual_version_id,
515 ))));
516 }
517 refill_version(&mut actual_version, &sst_id_to_info, table_id);
518 timer.observe_duration();
519 Ok(actual_version)
520 }
521
522 pub(crate) async fn write_time_travel_metadata(
523 &self,
524 txn: &DatabaseTransaction,
525 version: Option<&HummockVersion>,
526 delta: HummockVersionDelta,
527 time_travel_table_ids: HashSet<StateTableId>,
528 skip_sst_ids: &HashSet<HummockSstableId>,
529 tables_to_commit: impl Iterator<Item = (&TableId, &CompactionGroupId, u64)>,
530 ) -> Result<Option<HashSet<HummockSstableId>>> {
531 if self
532 .env
533 .system_params_reader()
534 .await
535 .time_travel_retention_ms()
536 == 0
537 {
538 return Ok(None);
539 }
540 async fn write_sstable_infos(
541 mut sst_infos: impl Iterator<Item = &SstableInfo>,
542 txn: &DatabaseTransaction,
543 batch_size: usize,
544 ) -> Result<usize> {
545 let mut count = 0;
546 let mut is_finished = false;
547 while !is_finished {
548 let mut remain = batch_size;
549 let mut batch = vec![];
550 while remain > 0 {
551 let Some(sst_info) = sst_infos.next() else {
552 is_finished = true;
553 break;
554 };
555 batch.push(hummock_sstable_info::ActiveModel {
556 sst_id: Set(sst_info.sst_id.inner().try_into().unwrap()),
557 object_id: Set(sst_info.object_id.inner().try_into().unwrap()),
558 sstable_info: Set(SstableInfoV2Backend::from(&sst_info.to_protobuf())),
559 });
560 remain -= 1;
561 count += 1;
562 }
563 if batch.is_empty() {
564 break;
565 }
566 hummock_sstable_info::Entity::insert_many(batch)
567 .on_conflict_do_nothing()
568 .exec(txn)
569 .await?;
570 }
571 Ok(count)
572 }
573
574 let mut batch = vec![];
575 for (table_id, _cg_id, committed_epoch) in tables_to_commit {
576 let version_id: u64 = delta.id.to_u64();
577 let m = hummock_epoch_to_version::ActiveModel {
578 epoch: Set(committed_epoch.try_into().unwrap()),
579 table_id: Set(table_id.as_raw_id() as _),
580 version_id: Set(version_id.try_into().unwrap()),
581 };
582 batch.push(m);
583 if batch.len()
584 >= self
585 .env
586 .opts
587 .hummock_time_travel_epoch_version_insert_batch_size
588 {
589 hummock_epoch_to_version::Entity::insert_many(std::mem::take(&mut batch))
591 .do_nothing()
592 .exec(txn)
593 .await?;
594 }
595 }
596 if !batch.is_empty() {
597 hummock_epoch_to_version::Entity::insert_many(batch)
599 .do_nothing()
600 .exec(txn)
601 .await?;
602 }
603
604 let mut version_sst_ids = None;
605 if let Some(version) = version {
606 version_sst_ids = Some(
608 version
609 .get_sst_infos(true)
610 .filter_map(|s| {
611 if s.table_ids
612 .iter()
613 .any(|tid| time_travel_table_ids.contains(tid))
614 {
615 return Some(s.sst_id);
616 }
617 None
618 })
619 .collect(),
620 );
621 write_sstable_infos(
622 version.get_sst_infos(true).filter(|s| {
623 !skip_sst_ids.contains(&s.sst_id)
624 && s.table_ids
625 .iter()
626 .any(|tid| time_travel_table_ids.contains(tid))
627 }),
628 txn,
629 self.env.opts.hummock_time_travel_sst_info_insert_batch_size,
630 )
631 .await?;
632 let m = hummock_time_travel_version::ActiveModel {
633 version_id: Set(risingwave_meta_model::HummockVersionId::try_from(
634 version.id.to_u64(),
635 )
636 .unwrap()),
637 version: Set(
638 (&IncompleteHummockVersion::from((version, &time_travel_table_ids))
639 .to_protobuf())
640 .into(),
641 ),
642 };
643 hummock_time_travel_version::Entity::insert(m)
644 .on_conflict_do_nothing()
645 .exec(txn)
646 .await?;
647 return Ok(version_sst_ids);
649 }
650 let written = write_sstable_infos(
651 delta.newly_added_sst_infos(true).filter(|s| {
652 !skip_sst_ids.contains(&s.sst_id)
653 && s.table_ids
654 .iter()
655 .any(|tid| time_travel_table_ids.contains(tid))
656 }),
657 txn,
658 self.env.opts.hummock_time_travel_sst_info_insert_batch_size,
659 )
660 .await?;
661 if written > 0 {
663 let m = hummock_time_travel_delta::ActiveModel {
664 version_id: Set(risingwave_meta_model::HummockVersionId::try_from(
665 delta.id.to_u64(),
666 )
667 .unwrap()),
668 version_delta: Set((&IncompleteHummockVersionDelta::from((
669 &delta,
670 &time_travel_table_ids,
671 ))
672 .to_protobuf())
673 .into()),
674 };
675 hummock_time_travel_delta::Entity::insert(m)
676 .on_conflict_do_nothing()
677 .exec(txn)
678 .await?;
679 }
680
681 Ok(version_sst_ids)
682 }
683}
684
685fn replay_archive(
687 version: PbHummockVersion,
688 deltas: impl Iterator<Item = PbHummockVersionDelta>,
689) -> HummockVersion {
690 let mut last_version = HummockVersion::from_persisted_protobuf(&version);
693 for d in deltas {
694 let d = HummockVersionDelta::from_persisted_protobuf(&d);
695 debug_assert!(
696 !should_mark_next_time_travel_version_snapshot(&d),
697 "unexpected time travel delta {:?}",
698 d
699 );
700 while last_version.id < d.prev_id {
703 last_version.id = last_version.id + 1;
704 }
705 last_version.apply_version_delta(&d);
706 }
707 last_version
708}
709
710pub fn require_sql_meta_store_err() -> Error {
711 Error::TimeTravel(anyhow!("require SQL meta store"))
712}
713
714pub fn should_mark_next_time_travel_version_snapshot(delta: &HummockVersionDelta) -> bool {
716 delta.group_deltas.iter().any(|(_, deltas)| {
717 deltas
718 .group_deltas
719 .iter()
720 .any(|d| !matches!(d, GroupDeltaCommon::NewL0SubLevel(_)))
721 })
722}