risingwave_meta/hummock/manager/
time_travel.rs1use std::collections::{HashMap, HashSet, VecDeque};
16
17use anyhow::anyhow;
18use risingwave_common::catalog::TableId;
19use risingwave_common::system_param::reader::SystemParamsRead;
20use risingwave_common::util::epoch::Epoch;
21use risingwave_hummock_sdk::compaction_group::StateTableId;
22use risingwave_hummock_sdk::sstable_info::SstableInfo;
23use risingwave_hummock_sdk::time_travel::{
24 IncompleteHummockVersion, IncompleteHummockVersionDelta, refill_version,
25};
26use risingwave_hummock_sdk::version::{GroupDeltaCommon, HummockVersion, HummockVersionDelta};
27use risingwave_hummock_sdk::{
28 CompactionGroupId, HummockEpoch, HummockSstableId, HummockSstableObjectId,
29};
30use risingwave_meta_model::hummock_sstable_info::SstableInfoV2Backend;
31use risingwave_meta_model::{
32 hummock_epoch_to_version, hummock_sstable_info, hummock_time_travel_delta,
33 hummock_time_travel_version,
34};
35use risingwave_pb::hummock::{PbHummockVersion, PbHummockVersionDelta};
36use sea_orm::ActiveValue::Set;
37use sea_orm::{
38 ColumnTrait, Condition, DatabaseTransaction, EntityTrait, PaginatorTrait, QueryFilter,
39 QueryOrder, QuerySelect, TransactionTrait,
40};
41
42use crate::hummock::HummockManager;
43use crate::hummock::error::{Error, Result};
44
45impl HummockManager {
47 pub(crate) async fn init_time_travel_state(&self) -> Result<()> {
48 let sql_store = self.env.meta_store_ref();
49 let mut guard = self.versioning.write().await;
50 guard.mark_next_time_travel_version_snapshot();
51
52 guard.last_time_travel_snapshot_sst_ids = HashSet::new();
53 let Some(version) = hummock_time_travel_version::Entity::find()
54 .order_by_desc(hummock_time_travel_version::Column::VersionId)
55 .one(&sql_store.conn)
56 .await?
57 .map(|v| IncompleteHummockVersion::from_persisted_protobuf(&v.version.to_protobuf()))
58 else {
59 return Ok(());
60 };
61 guard.last_time_travel_snapshot_sst_ids = version.get_sst_ids(true);
62 Ok(())
63 }
64
65 pub(crate) async fn truncate_time_travel_metadata(
66 &self,
67 epoch_watermark: HummockEpoch,
68 ) -> Result<()> {
69 let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id();
70 let sql_store = self.env.meta_store_ref();
71 let txn = sql_store.conn.begin().await?;
72 let version_watermark = hummock_epoch_to_version::Entity::find()
73 .filter(
74 hummock_epoch_to_version::Column::Epoch
75 .lt(risingwave_meta_model::Epoch::try_from(epoch_watermark).unwrap()),
76 )
77 .order_by_desc(hummock_epoch_to_version::Column::Epoch)
78 .order_by_asc(hummock_epoch_to_version::Column::VersionId)
79 .one(&txn)
80 .await?;
81 let Some(version_watermark) = version_watermark else {
82 txn.commit().await?;
83 return Ok(());
84 };
85 let watermark_version_id = std::cmp::min(
86 version_watermark.version_id,
87 min_pinned_version_id.to_u64().try_into().unwrap(),
88 );
89 let res = hummock_epoch_to_version::Entity::delete_many()
90 .filter(
91 hummock_epoch_to_version::Column::Epoch
92 .lt(risingwave_meta_model::Epoch::try_from(epoch_watermark).unwrap()),
93 )
94 .exec(&txn)
95 .await?;
96 tracing::debug!(
97 epoch_watermark,
98 "delete {} rows from hummock_epoch_to_version",
99 res.rows_affected
100 );
101 let latest_valid_version = hummock_time_travel_version::Entity::find()
102 .filter(hummock_time_travel_version::Column::VersionId.lte(watermark_version_id))
103 .order_by_desc(hummock_time_travel_version::Column::VersionId)
104 .one(&txn)
105 .await?
106 .map(|m| IncompleteHummockVersion::from_persisted_protobuf(&m.version.to_protobuf()));
107 let Some(latest_valid_version) = latest_valid_version else {
108 txn.commit().await?;
109 return Ok(());
110 };
111 let (
112 latest_valid_version_id,
113 latest_valid_version_sst_ids,
114 latest_valid_version_object_ids,
115 ) = {
116 (
117 latest_valid_version.id,
118 latest_valid_version.get_sst_ids(true),
119 latest_valid_version.get_object_ids(true),
120 )
121 };
122 let mut object_ids_to_delete: HashSet<_> = HashSet::default();
123 let version_ids_to_delete: Vec<risingwave_meta_model::HummockVersionId> =
124 hummock_time_travel_version::Entity::find()
125 .select_only()
126 .column(hummock_time_travel_version::Column::VersionId)
127 .filter(
128 hummock_time_travel_version::Column::VersionId
129 .lt(latest_valid_version_id.to_u64()),
130 )
131 .order_by_desc(hummock_time_travel_version::Column::VersionId)
132 .into_tuple()
133 .all(&txn)
134 .await?;
135 let delta_ids_to_delete: Vec<risingwave_meta_model::HummockVersionId> =
136 hummock_time_travel_delta::Entity::find()
137 .select_only()
138 .column(hummock_time_travel_delta::Column::VersionId)
139 .filter(
140 hummock_time_travel_delta::Column::VersionId
141 .lt(latest_valid_version_id.to_u64()),
142 )
143 .into_tuple()
144 .all(&txn)
145 .await?;
146 let delete_sst_batch_size = self
148 .env
149 .opts
150 .hummock_time_travel_epoch_version_insert_batch_size;
151 let mut sst_ids_to_delete: HashSet<_> = HashSet::default();
152 async fn delete_sst_in_batch(
153 txn: &DatabaseTransaction,
154 sst_ids_to_delete: HashSet<HummockSstableId>,
155 delete_sst_batch_size: usize,
156 ) -> Result<()> {
157 for start_idx in 0..=(sst_ids_to_delete.len().saturating_sub(1) / delete_sst_batch_size)
158 {
159 hummock_sstable_info::Entity::delete_many()
160 .filter(
161 hummock_sstable_info::Column::SstId.is_in(
162 sst_ids_to_delete
163 .iter()
164 .skip(start_idx * delete_sst_batch_size)
165 .take(delete_sst_batch_size)
166 .copied(),
167 ),
168 )
169 .exec(txn)
170 .await?;
171 }
172 Ok(())
173 }
174 for delta_id_to_delete in delta_ids_to_delete {
175 let delta_to_delete = hummock_time_travel_delta::Entity::find_by_id(delta_id_to_delete)
176 .one(&txn)
177 .await?
178 .ok_or_else(|| {
179 Error::TimeTravel(anyhow!(format!(
180 "version delta {} not found",
181 delta_id_to_delete
182 )))
183 })?;
184 let delta_to_delete = IncompleteHummockVersionDelta::from_persisted_protobuf(
185 &delta_to_delete.version_delta.to_protobuf(),
186 );
187 let new_sst_ids = delta_to_delete.newly_added_sst_ids(true);
188 sst_ids_to_delete.extend(&new_sst_ids - &latest_valid_version_sst_ids);
190 if sst_ids_to_delete.len() >= delete_sst_batch_size {
191 delete_sst_in_batch(
192 &txn,
193 std::mem::take(&mut sst_ids_to_delete),
194 delete_sst_batch_size,
195 )
196 .await?;
197 }
198 let new_object_ids = delta_to_delete.newly_added_object_ids(true);
199 object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
200 }
201 let mut next_version_sst_ids = latest_valid_version_sst_ids;
202 for prev_version_id in version_ids_to_delete {
203 let prev_version = {
204 let prev_version = hummock_time_travel_version::Entity::find_by_id(prev_version_id)
205 .one(&txn)
206 .await?
207 .ok_or_else(|| {
208 Error::TimeTravel(anyhow!(format!(
209 "prev_version {} not found",
210 prev_version_id
211 )))
212 })?;
213 IncompleteHummockVersion::from_persisted_protobuf(
214 &prev_version.version.to_protobuf(),
215 )
216 };
217 let sst_ids = prev_version.get_sst_ids(true);
218 sst_ids_to_delete.extend(&sst_ids - &next_version_sst_ids);
220 if sst_ids_to_delete.len() >= delete_sst_batch_size {
221 delete_sst_in_batch(
222 &txn,
223 std::mem::take(&mut sst_ids_to_delete),
224 delete_sst_batch_size,
225 )
226 .await?;
227 }
228 let new_object_ids = prev_version.get_object_ids(true);
229 object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
230 next_version_sst_ids = sst_ids;
231 }
232 if !sst_ids_to_delete.is_empty() {
233 delete_sst_in_batch(&txn, sst_ids_to_delete, delete_sst_batch_size).await?;
234 }
235
236 if !object_ids_to_delete.is_empty() {
237 self.gc_manager
240 .add_may_delete_object_ids(object_ids_to_delete.into_iter());
241 }
242
243 let res = hummock_time_travel_version::Entity::delete_many()
244 .filter(
245 hummock_time_travel_version::Column::VersionId.lt(latest_valid_version_id.to_u64()),
246 )
247 .exec(&txn)
248 .await?;
249 tracing::debug!(
250 epoch_watermark_version_id = ?watermark_version_id,
251 ?latest_valid_version_id,
252 "delete {} rows from hummock_time_travel_version",
253 res.rows_affected
254 );
255
256 let res = hummock_time_travel_delta::Entity::delete_many()
257 .filter(
258 hummock_time_travel_delta::Column::VersionId.lt(latest_valid_version_id.to_u64()),
259 )
260 .exec(&txn)
261 .await?;
262 tracing::debug!(
263 epoch_watermark_version_id = ?watermark_version_id,
264 ?latest_valid_version_id,
265 "delete {} rows from hummock_time_travel_delta",
266 res.rows_affected
267 );
268
269 txn.commit().await?;
270 Ok(())
271 }
272
273 pub(crate) async fn filter_out_objects_by_time_travel(
274 &self,
275 objects: impl Iterator<Item = HummockSstableObjectId>,
276 batch_size: usize,
277 ) -> Result<HashSet<HummockSstableObjectId>> {
278 let mut result: HashSet<_> = objects.collect();
281 let mut remain: VecDeque<_> = result.iter().copied().collect();
282 while !remain.is_empty() {
283 let batch = remain.drain(..std::cmp::min(remain.len(), batch_size));
284 let reject_object_ids: Vec<risingwave_meta_model::HummockSstableObjectId> =
285 hummock_sstable_info::Entity::find()
286 .filter(hummock_sstable_info::Column::ObjectId.is_in(batch))
287 .select_only()
288 .column(hummock_sstable_info::Column::ObjectId)
289 .into_tuple()
290 .all(&self.env.meta_store_ref().conn)
291 .await?;
292 for reject in reject_object_ids {
293 let object_id = HummockSstableObjectId::try_from(reject).unwrap();
294 result.remove(&object_id);
295 }
296 }
297 Ok(result)
298 }
299
300 pub(crate) async fn time_travel_pinned_object_count(&self) -> Result<u64> {
301 let count = hummock_sstable_info::Entity::find()
302 .count(&self.env.meta_store_ref().conn)
303 .await?;
304 Ok(count)
305 }
306
307 pub async fn epoch_to_version(
313 &self,
314 query_epoch: HummockEpoch,
315 table_id: u32,
316 ) -> Result<HummockVersion> {
317 let sql_store = self.env.meta_store_ref();
318 let _permit = self.inflight_time_travel_query.try_acquire().map_err(|_| {
319 anyhow!(format!(
320 "too many inflight time travel queries, max_inflight_time_travel_query={}",
321 self.env.opts.max_inflight_time_travel_query
322 ))
323 })?;
324 let epoch_to_version = hummock_epoch_to_version::Entity::find()
325 .filter(
326 Condition::any()
327 .add(hummock_epoch_to_version::Column::TableId.eq(i64::from(table_id)))
328 .add(hummock_epoch_to_version::Column::TableId.eq(0)),
330 )
331 .filter(
332 hummock_epoch_to_version::Column::Epoch
333 .lte(risingwave_meta_model::Epoch::try_from(query_epoch).unwrap()),
334 )
335 .order_by_desc(hummock_epoch_to_version::Column::Epoch)
336 .one(&sql_store.conn)
337 .await?
338 .ok_or_else(|| {
339 Error::TimeTravel(anyhow!(format!(
340 "version not found for epoch {}",
341 query_epoch
342 )))
343 })?;
344 let timer = self
345 .metrics
346 .time_travel_version_replay_latency
347 .start_timer();
348 let actual_version_id = epoch_to_version.version_id;
349 tracing::debug!(
350 query_epoch,
351 query_tz = ?(Epoch(query_epoch).as_timestamptz()),
352 actual_epoch = epoch_to_version.epoch,
353 actual_tz = ?(Epoch(u64::try_from(epoch_to_version.epoch).unwrap()).as_timestamptz()),
354 actual_version_id,
355 "convert query epoch"
356 );
357
358 let replay_version = hummock_time_travel_version::Entity::find()
359 .filter(hummock_time_travel_version::Column::VersionId.lte(actual_version_id))
360 .order_by_desc(hummock_time_travel_version::Column::VersionId)
361 .one(&sql_store.conn)
362 .await?
363 .ok_or_else(|| {
364 Error::TimeTravel(anyhow!(format!(
365 "no replay version found for epoch {}, version {}",
366 query_epoch, actual_version_id,
367 )))
368 })?;
369 let deltas = hummock_time_travel_delta::Entity::find()
370 .filter(hummock_time_travel_delta::Column::VersionId.gt(replay_version.version_id))
371 .filter(hummock_time_travel_delta::Column::VersionId.lte(actual_version_id))
372 .order_by_asc(hummock_time_travel_delta::Column::VersionId)
373 .all(&sql_store.conn)
374 .await?;
375 let mut actual_version = replay_archive(
377 replay_version.version.to_protobuf(),
378 deltas.into_iter().map(|d| d.version_delta.to_protobuf()),
379 );
380
381 let mut sst_ids = actual_version
382 .get_sst_ids(true)
383 .into_iter()
384 .collect::<VecDeque<_>>();
385 let sst_count = sst_ids.len();
386 let mut sst_id_to_info = HashMap::with_capacity(sst_count);
387 let sst_info_fetch_batch_size = self.env.opts.hummock_time_travel_sst_info_fetch_batch_size;
388 while !sst_ids.is_empty() {
389 let sst_infos = hummock_sstable_info::Entity::find()
390 .filter(hummock_sstable_info::Column::SstId.is_in(
391 sst_ids.drain(..std::cmp::min(sst_info_fetch_batch_size, sst_ids.len())),
392 ))
393 .all(&sql_store.conn)
394 .await?;
395 for sst_info in sst_infos {
396 let sst_info: SstableInfo = sst_info.sstable_info.to_protobuf().into();
397 sst_id_to_info.insert(sst_info.sst_id, sst_info);
398 }
399 }
400 if sst_count != sst_id_to_info.len() {
401 return Err(Error::TimeTravel(anyhow!(format!(
402 "some SstableInfos not found for epoch {}, version {}",
403 query_epoch, actual_version_id,
404 ))));
405 }
406 refill_version(&mut actual_version, &sst_id_to_info, table_id);
407 timer.observe_duration();
408 Ok(actual_version)
409 }
410
411 pub(crate) async fn write_time_travel_metadata(
412 &self,
413 txn: &DatabaseTransaction,
414 version: Option<&HummockVersion>,
415 delta: HummockVersionDelta,
416 time_travel_table_ids: HashSet<StateTableId>,
417 skip_sst_ids: &HashSet<HummockSstableId>,
418 tables_to_commit: impl Iterator<Item = (&TableId, &CompactionGroupId, u64)>,
419 ) -> Result<Option<HashSet<HummockSstableId>>> {
420 if self
421 .env
422 .system_params_reader()
423 .await
424 .time_travel_retention_ms()
425 == 0
426 {
427 return Ok(None);
428 }
429 async fn write_sstable_infos(
430 mut sst_infos: impl Iterator<Item = &SstableInfo>,
431 txn: &DatabaseTransaction,
432 batch_size: usize,
433 ) -> Result<usize> {
434 let mut count = 0;
435 let mut is_finished = false;
436 while !is_finished {
437 let mut remain = batch_size;
438 let mut batch = vec![];
439 while remain > 0 {
440 let Some(sst_info) = sst_infos.next() else {
441 is_finished = true;
442 break;
443 };
444 batch.push(hummock_sstable_info::ActiveModel {
445 sst_id: Set(sst_info.sst_id.try_into().unwrap()),
446 object_id: Set(sst_info.object_id.try_into().unwrap()),
447 sstable_info: Set(SstableInfoV2Backend::from(&sst_info.to_protobuf())),
448 });
449 remain -= 1;
450 count += 1;
451 }
452 if batch.is_empty() {
453 break;
454 }
455 hummock_sstable_info::Entity::insert_many(batch)
456 .on_conflict_do_nothing()
457 .exec(txn)
458 .await?;
459 }
460 Ok(count)
461 }
462
463 let mut batch = vec![];
464 for (table_id, _cg_id, committed_epoch) in tables_to_commit {
465 let version_id: u64 = delta.id.to_u64();
466 let m = hummock_epoch_to_version::ActiveModel {
467 epoch: Set(committed_epoch.try_into().unwrap()),
468 table_id: Set(table_id.table_id.into()),
469 version_id: Set(version_id.try_into().unwrap()),
470 };
471 batch.push(m);
472 if batch.len()
473 >= self
474 .env
475 .opts
476 .hummock_time_travel_epoch_version_insert_batch_size
477 {
478 hummock_epoch_to_version::Entity::insert_many(std::mem::take(&mut batch))
480 .do_nothing()
481 .exec(txn)
482 .await?;
483 }
484 }
485 if !batch.is_empty() {
486 hummock_epoch_to_version::Entity::insert_many(batch)
488 .do_nothing()
489 .exec(txn)
490 .await?;
491 }
492
493 let mut version_sst_ids = None;
494 if let Some(version) = version {
495 version_sst_ids = Some(
497 version
498 .get_sst_infos(true)
499 .filter_map(|s| {
500 if s.table_ids
501 .iter()
502 .any(|tid| time_travel_table_ids.contains(tid))
503 {
504 return Some(s.sst_id);
505 }
506 None
507 })
508 .collect(),
509 );
510 write_sstable_infos(
511 version.get_sst_infos(true).filter(|s| {
512 !skip_sst_ids.contains(&s.sst_id)
513 && s.table_ids
514 .iter()
515 .any(|tid| time_travel_table_ids.contains(tid))
516 }),
517 txn,
518 self.env.opts.hummock_time_travel_sst_info_insert_batch_size,
519 )
520 .await?;
521 let m = hummock_time_travel_version::ActiveModel {
522 version_id: Set(risingwave_meta_model::HummockVersionId::try_from(
523 version.id.to_u64(),
524 )
525 .unwrap()),
526 version: Set(
527 (&IncompleteHummockVersion::from((version, &time_travel_table_ids))
528 .to_protobuf())
529 .into(),
530 ),
531 };
532 hummock_time_travel_version::Entity::insert(m)
533 .on_conflict_do_nothing()
534 .exec(txn)
535 .await?;
536 return Ok(version_sst_ids);
538 }
539 let written = write_sstable_infos(
540 delta.newly_added_sst_infos(true).filter(|s| {
541 !skip_sst_ids.contains(&s.sst_id)
542 && s.table_ids
543 .iter()
544 .any(|tid| time_travel_table_ids.contains(tid))
545 }),
546 txn,
547 self.env.opts.hummock_time_travel_sst_info_insert_batch_size,
548 )
549 .await?;
550 if written > 0 {
552 let m = hummock_time_travel_delta::ActiveModel {
553 version_id: Set(risingwave_meta_model::HummockVersionId::try_from(
554 delta.id.to_u64(),
555 )
556 .unwrap()),
557 version_delta: Set((&IncompleteHummockVersionDelta::from((
558 &delta,
559 &time_travel_table_ids,
560 ))
561 .to_protobuf())
562 .into()),
563 };
564 hummock_time_travel_delta::Entity::insert(m)
565 .on_conflict_do_nothing()
566 .exec(txn)
567 .await?;
568 }
569
570 Ok(version_sst_ids)
571 }
572}
573
574fn replay_archive(
576 version: PbHummockVersion,
577 deltas: impl Iterator<Item = PbHummockVersionDelta>,
578) -> HummockVersion {
579 let mut last_version = HummockVersion::from_persisted_protobuf(&version);
582 for d in deltas {
583 let d = HummockVersionDelta::from_persisted_protobuf(&d);
584 debug_assert!(
585 !should_mark_next_time_travel_version_snapshot(&d),
586 "unexpected time travel delta {:?}",
587 d
588 );
589 while last_version.id < d.prev_id {
592 last_version.id = last_version.id + 1;
593 }
594 last_version.apply_version_delta(&d);
595 }
596 last_version
597}
598
599pub fn require_sql_meta_store_err() -> Error {
600 Error::TimeTravel(anyhow!("require SQL meta store"))
601}
602
603pub fn should_mark_next_time_travel_version_snapshot(delta: &HummockVersionDelta) -> bool {
605 delta.group_deltas.iter().any(|(_, deltas)| {
606 deltas
607 .group_deltas
608 .iter()
609 .any(|d| !matches!(d, GroupDeltaCommon::NewL0SubLevel(_)))
610 })
611}