1use std::collections::{HashMap, HashSet, VecDeque};
16use std::future::poll_fn;
17use std::ops::Range;
18use std::sync::{Arc, LazyLock};
19use std::task::{Poll, ready};
20use std::time::{Duration, Instant};
21
22use foyer::{HybridCacheEntry, RangeBoundsExt};
23use futures::future::{join_all, try_join_all};
24use futures::{Future, FutureExt};
25use itertools::Itertools;
26use prometheus::core::{AtomicU64, GenericCounter, GenericCounterVec};
27use prometheus::{
28 Histogram, HistogramVec, IntGauge, Registry, register_histogram_vec_with_registry,
29 register_int_counter_vec_with_registry, register_int_gauge_with_registry,
30};
31use risingwave_common::license::Feature;
32use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
33use risingwave_hummock_sdk::compaction_group::hummock_version_ext::SstDeltaInfo;
34use risingwave_hummock_sdk::{HummockSstableObjectId, KeyComparator};
35use thiserror_ext::AsReport;
36use tokio::sync::Semaphore;
37use tokio::task::JoinHandle;
38
39use crate::hummock::local_version::pinned_version::PinnedVersion;
40use crate::hummock::{
41 Block, HummockError, HummockResult, Sstable, SstableBlockIndex, SstableStoreRef, TableHolder,
42};
43use crate::monitor::StoreLocalStatistic;
44use crate::opts::StorageOpts;
45
46pub static GLOBAL_CACHE_REFILL_METRICS: LazyLock<CacheRefillMetrics> =
47 LazyLock::new(|| CacheRefillMetrics::new(&GLOBAL_METRICS_REGISTRY));
48
49pub struct CacheRefillMetrics {
50 pub refill_duration: HistogramVec,
51 pub refill_total: GenericCounterVec<AtomicU64>,
52 pub refill_bytes: GenericCounterVec<AtomicU64>,
53
54 pub data_refill_success_duration: Histogram,
55 pub meta_refill_success_duration: Histogram,
56
57 pub data_refill_filtered_total: GenericCounter<AtomicU64>,
58 pub data_refill_attempts_total: GenericCounter<AtomicU64>,
59 pub data_refill_started_total: GenericCounter<AtomicU64>,
60 pub meta_refill_attempts_total: GenericCounter<AtomicU64>,
61
62 pub data_refill_parent_meta_lookup_hit_total: GenericCounter<AtomicU64>,
63 pub data_refill_parent_meta_lookup_miss_total: GenericCounter<AtomicU64>,
64 pub data_refill_unit_inheritance_hit_total: GenericCounter<AtomicU64>,
65 pub data_refill_unit_inheritance_miss_total: GenericCounter<AtomicU64>,
66
67 pub data_refill_block_unfiltered_total: GenericCounter<AtomicU64>,
68 pub data_refill_block_success_total: GenericCounter<AtomicU64>,
69
70 pub data_refill_ideal_bytes: GenericCounter<AtomicU64>,
71 pub data_refill_success_bytes: GenericCounter<AtomicU64>,
72
73 pub refill_queue_total: IntGauge,
74}
75
76impl CacheRefillMetrics {
77 pub fn new(registry: &Registry) -> Self {
78 let refill_duration = register_histogram_vec_with_registry!(
79 "refill_duration",
80 "refill duration",
81 &["type", "op"],
82 registry,
83 )
84 .unwrap();
85 let refill_total = register_int_counter_vec_with_registry!(
86 "refill_total",
87 "refill total",
88 &["type", "op"],
89 registry,
90 )
91 .unwrap();
92 let refill_bytes = register_int_counter_vec_with_registry!(
93 "refill_bytes",
94 "refill bytes",
95 &["type", "op"],
96 registry,
97 )
98 .unwrap();
99
100 let data_refill_success_duration = refill_duration
101 .get_metric_with_label_values(&["data", "success"])
102 .unwrap();
103 let meta_refill_success_duration = refill_duration
104 .get_metric_with_label_values(&["meta", "success"])
105 .unwrap();
106
107 let data_refill_filtered_total = refill_total
108 .get_metric_with_label_values(&["data", "filtered"])
109 .unwrap();
110 let data_refill_attempts_total = refill_total
111 .get_metric_with_label_values(&["data", "attempts"])
112 .unwrap();
113 let data_refill_started_total = refill_total
114 .get_metric_with_label_values(&["data", "started"])
115 .unwrap();
116 let meta_refill_attempts_total = refill_total
117 .get_metric_with_label_values(&["meta", "attempts"])
118 .unwrap();
119
120 let data_refill_parent_meta_lookup_hit_total = refill_total
121 .get_metric_with_label_values(&["parent_meta", "hit"])
122 .unwrap();
123 let data_refill_parent_meta_lookup_miss_total = refill_total
124 .get_metric_with_label_values(&["parent_meta", "miss"])
125 .unwrap();
126 let data_refill_unit_inheritance_hit_total = refill_total
127 .get_metric_with_label_values(&["unit_inheritance", "hit"])
128 .unwrap();
129 let data_refill_unit_inheritance_miss_total = refill_total
130 .get_metric_with_label_values(&["unit_inheritance", "miss"])
131 .unwrap();
132
133 let data_refill_block_unfiltered_total = refill_total
134 .get_metric_with_label_values(&["block", "unfiltered"])
135 .unwrap();
136 let data_refill_block_success_total = refill_total
137 .get_metric_with_label_values(&["block", "success"])
138 .unwrap();
139
140 let data_refill_ideal_bytes = refill_bytes
141 .get_metric_with_label_values(&["data", "ideal"])
142 .unwrap();
143 let data_refill_success_bytes = refill_bytes
144 .get_metric_with_label_values(&["data", "success"])
145 .unwrap();
146
147 let refill_queue_total = register_int_gauge_with_registry!(
148 "refill_queue_total",
149 "refill queue total",
150 registry,
151 )
152 .unwrap();
153
154 Self {
155 refill_duration,
156 refill_total,
157 refill_bytes,
158
159 data_refill_success_duration,
160 meta_refill_success_duration,
161 data_refill_filtered_total,
162 data_refill_attempts_total,
163 data_refill_started_total,
164 meta_refill_attempts_total,
165
166 data_refill_parent_meta_lookup_hit_total,
167 data_refill_parent_meta_lookup_miss_total,
168 data_refill_unit_inheritance_hit_total,
169 data_refill_unit_inheritance_miss_total,
170
171 data_refill_block_unfiltered_total,
172 data_refill_block_success_total,
173
174 data_refill_ideal_bytes,
175 data_refill_success_bytes,
176
177 refill_queue_total,
178 }
179 }
180}
181
182#[derive(Debug)]
183pub struct CacheRefillConfig {
184 pub timeout: Duration,
186
187 pub data_refill_levels: HashSet<u32>,
189
190 pub concurrency: usize,
192
193 pub unit: usize,
195
196 pub threshold: f64,
200}
201
202impl CacheRefillConfig {
203 pub fn from_storage_opts(options: &StorageOpts) -> Self {
204 let data_refill_levels = match Feature::ElasticDiskCache.check_available() {
205 Ok(_) => options
206 .cache_refill_data_refill_levels
207 .iter()
208 .copied()
209 .collect(),
210 Err(e) => {
211 tracing::warn!(error = %e.as_report(), "ElasticDiskCache is not available.");
212 HashSet::new()
213 }
214 };
215
216 Self {
217 timeout: Duration::from_millis(options.cache_refill_timeout_ms),
218 data_refill_levels,
219 concurrency: options.cache_refill_concurrency,
220 unit: options.cache_refill_unit,
221 threshold: options.cache_refill_threshold,
222 }
223 }
224}
225
226struct Item {
227 handle: JoinHandle<()>,
228 event: CacheRefillerEvent,
229}
230
231pub(crate) type SpawnRefillTask = Arc<
232 dyn Fn(Vec<SstDeltaInfo>, CacheRefillContext, PinnedVersion, PinnedVersion) -> JoinHandle<()>
234 + Send
235 + Sync
236 + 'static,
237>;
238
239pub(crate) struct CacheRefiller {
241 queue: VecDeque<Item>,
243
244 context: CacheRefillContext,
245
246 spawn_refill_task: SpawnRefillTask,
247}
248
249impl CacheRefiller {
250 pub(crate) fn new(
251 config: CacheRefillConfig,
252 sstable_store: SstableStoreRef,
253 spawn_refill_task: SpawnRefillTask,
254 ) -> Self {
255 let config = Arc::new(config);
256 let concurrency = Arc::new(Semaphore::new(config.concurrency));
257 Self {
258 queue: VecDeque::new(),
259 context: CacheRefillContext {
260 config,
261 concurrency,
262 sstable_store,
263 },
264 spawn_refill_task,
265 }
266 }
267
268 pub(crate) fn default_spawn_refill_task() -> SpawnRefillTask {
269 Arc::new(|deltas, context, _, _| {
270 let task = CacheRefillTask { deltas, context };
271 tokio::spawn(task.run())
272 })
273 }
274
275 pub(crate) fn start_cache_refill(
276 &mut self,
277 deltas: Vec<SstDeltaInfo>,
278 pinned_version: PinnedVersion,
279 new_pinned_version: PinnedVersion,
280 ) {
281 let handle = (self.spawn_refill_task)(
282 deltas,
283 self.context.clone(),
284 pinned_version.clone(),
285 new_pinned_version.clone(),
286 );
287 let event = CacheRefillerEvent {
288 pinned_version,
289 new_pinned_version,
290 };
291 let item = Item { handle, event };
292 self.queue.push_back(item);
293 GLOBAL_CACHE_REFILL_METRICS.refill_queue_total.add(1);
294 }
295
296 pub(crate) fn last_new_pinned_version(&self) -> Option<&PinnedVersion> {
297 self.queue.back().map(|item| &item.event.new_pinned_version)
298 }
299}
300
301impl CacheRefiller {
302 pub(crate) fn next_event(&mut self) -> impl Future<Output = CacheRefillerEvent> + '_ {
303 poll_fn(|cx| {
304 if let Some(item) = self.queue.front_mut() {
305 ready!(item.handle.poll_unpin(cx)).unwrap();
306 let item = self.queue.pop_front().unwrap();
307 GLOBAL_CACHE_REFILL_METRICS.refill_queue_total.sub(1);
308 return Poll::Ready(item.event);
309 }
310 Poll::Pending
311 })
312 }
313}
314
315pub struct CacheRefillerEvent {
316 pub pinned_version: PinnedVersion,
317 pub new_pinned_version: PinnedVersion,
318}
319
320#[derive(Clone)]
321pub(crate) struct CacheRefillContext {
322 config: Arc<CacheRefillConfig>,
323 concurrency: Arc<Semaphore>,
324 sstable_store: SstableStoreRef,
325}
326
327struct CacheRefillTask {
328 deltas: Vec<SstDeltaInfo>,
329 context: CacheRefillContext,
330}
331
332impl CacheRefillTask {
333 async fn run(self) {
334 let tasks = self
335 .deltas
336 .iter()
337 .map(|delta| {
338 let context = self.context.clone();
339 async move {
340 let holders = match Self::meta_cache_refill(&context, delta).await {
341 Ok(holders) => holders,
342 Err(e) => {
343 tracing::warn!(error = %e.as_report(), "meta cache refill error");
344 return;
345 }
346 };
347 Self::data_cache_refill(&context, delta, holders).await;
348 }
349 })
350 .collect_vec();
351 let future = join_all(tasks);
352
353 let _ = tokio::time::timeout(self.context.config.timeout, future).await;
354 }
355
356 async fn meta_cache_refill(
357 context: &CacheRefillContext,
358 delta: &SstDeltaInfo,
359 ) -> HummockResult<Vec<TableHolder>> {
360 let tasks = delta
361 .insert_sst_infos
362 .iter()
363 .map(|info| async {
364 let mut stats = StoreLocalStatistic::default();
365 GLOBAL_CACHE_REFILL_METRICS.meta_refill_attempts_total.inc();
366
367 let now = Instant::now();
368 let res = context.sstable_store.sstable(info, &mut stats).await;
369 stats.discard();
370 GLOBAL_CACHE_REFILL_METRICS
371 .meta_refill_success_duration
372 .observe(now.elapsed().as_secs_f64());
373 res
374 })
375 .collect_vec();
376 let holders = try_join_all(tasks).await?;
377 Ok(holders)
378 }
379
380 fn get_units_to_refill_by_inheritance(
382 context: &CacheRefillContext,
383 ssts: &[TableHolder],
384 parent_ssts: impl IntoIterator<Item = HybridCacheEntry<HummockSstableObjectId, Box<Sstable>>>,
385 ) -> HashSet<SstableUnit> {
386 let mut res = HashSet::default();
387
388 let Some(filter) = context.sstable_store.data_recent_filter() else {
389 return res;
390 };
391
392 let units = {
393 let unit = context.config.unit;
394 ssts.iter()
395 .flat_map(|sst| {
396 let units = Unit::units(sst, unit);
397 (0..units).map(|uidx| Unit::new(sst, unit, uidx))
398 })
399 .collect_vec()
400 };
401
402 if cfg!(debug_assertions) {
403 units.iter().tuple_windows().for_each(|(a, b)| {
405 debug_assert_ne!(
406 KeyComparator::compare_encoded_full_key(a.largest_key(), b.smallest_key()),
407 std::cmp::Ordering::Greater
408 )
409 });
410 }
411
412 for psst in parent_ssts {
413 for pblk in 0..psst.block_count() {
414 let pleft = &psst.meta.block_metas[pblk].smallest_key;
415 let pright = if pblk + 1 == psst.block_count() {
416 &psst.meta.largest_key
418 } else {
419 &psst.meta.block_metas[pblk + 1].smallest_key
420 };
421
422 let uleft = units.partition_point(|unit| {
424 KeyComparator::compare_encoded_full_key(unit.largest_key(), pleft)
425 == std::cmp::Ordering::Less
426 });
427 let uright = units.partition_point(|unit| {
429 KeyComparator::compare_encoded_full_key(unit.smallest_key(), pright)
430 != std::cmp::Ordering::Greater
431 });
432
433 for u in units.iter().take(uright).skip(uleft) {
435 let unit = SstableUnit {
436 sst_obj_id: u.sst.id,
437 blks: u.blks.clone(),
438 };
439 if res.contains(&unit) {
440 continue;
441 }
442 if filter.contains(&(psst.id, pblk)) {
443 res.insert(unit);
444 }
445 }
446 }
447 }
448
449 let hit = res.len();
450 let miss = units.len() - res.len();
451 GLOBAL_CACHE_REFILL_METRICS
452 .data_refill_unit_inheritance_hit_total
453 .inc_by(hit as u64);
454 GLOBAL_CACHE_REFILL_METRICS
455 .data_refill_unit_inheritance_miss_total
456 .inc_by(miss as u64);
457
458 res
459 }
460
461 async fn data_cache_refill(
463 context: &CacheRefillContext,
464 delta: &SstDeltaInfo,
465 holders: Vec<TableHolder>,
466 ) {
467 if !context.sstable_store.block_cache().is_hybrid() {
469 return;
470 }
471
472 if delta.insert_sst_infos.is_empty() || delta.delete_sst_object_ids.is_empty() {
474 return;
475 }
476
477 let Some(filter) = context.sstable_store.data_recent_filter() else {
479 return;
480 };
481
482 if !context
484 .config
485 .data_refill_levels
486 .contains(&delta.insert_sst_level)
487 || !delta
488 .delete_sst_object_ids
489 .iter()
490 .any(|&id| filter.contains(&(id, usize::MAX)))
491 {
492 GLOBAL_CACHE_REFILL_METRICS.data_refill_filtered_total.inc();
493 return;
494 }
495
496 GLOBAL_CACHE_REFILL_METRICS
497 .data_refill_block_unfiltered_total
498 .inc_by(
499 holders
500 .iter()
501 .map(|sst| sst.block_count() as u64)
502 .sum::<u64>(),
503 );
504
505 if delta.insert_sst_level == 0 {
506 Self::data_file_cache_refill_l0_impl(context, delta, holders).await;
507 } else {
508 Self::data_file_cache_impl(context, delta, holders).await;
509 }
510 }
511
512 async fn data_file_cache_refill_l0_impl(
513 context: &CacheRefillContext,
514 _delta: &SstDeltaInfo,
515 holders: Vec<TableHolder>,
516 ) {
517 let unit = context.config.unit;
518
519 let mut futures = vec![];
520
521 for sst in &holders {
522 for blk_start in (0..sst.block_count()).step_by(unit) {
523 let blk_end = std::cmp::min(sst.block_count(), blk_start + unit);
524 let unit = SstableUnit {
525 sst_obj_id: sst.id,
526 blks: blk_start..blk_end,
527 };
528 futures.push(
529 async move { Self::data_file_cache_refill_unit(context, sst, unit).await },
530 );
531 }
532 }
533 join_all(futures).await;
534 }
535
536 async fn data_file_cache_impl(
537 context: &CacheRefillContext,
538 delta: &SstDeltaInfo,
539 holders: Vec<TableHolder>,
540 ) {
541 let sstable_store = context.sstable_store.clone();
542 let futures = delta.delete_sst_object_ids.iter().map(|sst_obj_id| {
543 let store = &sstable_store;
544 async move {
545 let res = store.sstable_cached(*sst_obj_id).await;
546 match res {
547 Ok(Some(_)) => GLOBAL_CACHE_REFILL_METRICS
548 .data_refill_parent_meta_lookup_hit_total
549 .inc(),
550 Ok(None) => GLOBAL_CACHE_REFILL_METRICS
551 .data_refill_parent_meta_lookup_miss_total
552 .inc(),
553 _ => {}
554 }
555 res
556 }
557 });
558 let parent_ssts = match try_join_all(futures).await {
559 Ok(parent_ssts) => parent_ssts.into_iter().flatten(),
560 Err(e) => {
561 return tracing::error!(error = %e.as_report(), "get old meta from cache error");
562 }
563 };
564 let units = Self::get_units_to_refill_by_inheritance(context, &holders, parent_ssts);
565
566 let ssts: HashMap<HummockSstableObjectId, TableHolder> =
567 holders.into_iter().map(|meta| (meta.id, meta)).collect();
568 let futures = units.into_iter().map(|unit| {
569 let ssts = &ssts;
570 async move {
571 let sst = ssts.get(&unit.sst_obj_id).unwrap();
572 if let Err(e) = Self::data_file_cache_refill_unit(context, sst, unit).await {
573 tracing::error!(error = %e.as_report(), "data file cache unit refill error");
574 }
575 }
576 });
577 join_all(futures).await;
578 }
579
580 async fn data_file_cache_refill_unit(
581 context: &CacheRefillContext,
582 sst: &Sstable,
583 unit: SstableUnit,
584 ) -> HummockResult<()> {
585 let sstable_store = &context.sstable_store;
586 let threshold = context.config.threshold;
587
588 if let Some(filter) = sstable_store.data_recent_filter() {
590 filter.insert((sst.id, usize::MAX));
591 }
592
593 let blocks = unit.blks.size().unwrap();
594
595 let mut tasks = vec![];
596 let mut contexts = Vec::with_capacity(blocks);
597 let mut admits = 0;
598
599 let (range_first, _) = sst.calculate_block_info(unit.blks.start);
600 let (range_last, _) = sst.calculate_block_info(unit.blks.end - 1);
601 let range = range_first.start..range_last.end;
602
603 GLOBAL_CACHE_REFILL_METRICS
604 .data_refill_ideal_bytes
605 .inc_by(range.size().unwrap() as u64);
606
607 for blk in unit.blks {
608 let (range, uncompressed_capacity) = sst.calculate_block_info(blk);
609 let key = SstableBlockIndex {
610 sst_id: sst.id,
611 block_idx: blk as u64,
612 };
613
614 let mut writer = sstable_store.block_cache().storage_writer(key);
615
616 if writer.pick().admitted() {
617 admits += 1;
618 }
619
620 contexts.push((writer, range, uncompressed_capacity))
621 }
622
623 if admits as f64 / contexts.len() as f64 >= threshold {
624 let task = async move {
625 GLOBAL_CACHE_REFILL_METRICS.data_refill_attempts_total.inc();
626
627 let permit = context.concurrency.acquire().await.unwrap();
628
629 GLOBAL_CACHE_REFILL_METRICS.data_refill_started_total.inc();
630
631 let timer = GLOBAL_CACHE_REFILL_METRICS
632 .data_refill_success_duration
633 .start_timer();
634
635 let data = sstable_store
636 .store()
637 .read(&sstable_store.get_sst_data_path(sst.id), range.clone())
638 .await?;
639 let mut futures = vec![];
640 for (w, r, uc) in contexts {
641 let offset = r.start - range.start;
642 let len = r.end - r.start;
643 let bytes = data.slice(offset..offset + len);
644 let future = async move {
645 let value = Box::new(Block::decode(bytes, uc)?);
646 if let Some(_entry) = w.force().insert(value) {
648 GLOBAL_CACHE_REFILL_METRICS
649 .data_refill_success_bytes
650 .inc_by(len as u64);
651 GLOBAL_CACHE_REFILL_METRICS
652 .data_refill_block_success_total
653 .inc();
654 }
655 Ok::<_, HummockError>(())
656 };
657 futures.push(future);
658 }
659 try_join_all(futures)
660 .await
661 .map_err(HummockError::file_cache)?;
662
663 drop(permit);
664 drop(timer);
665
666 Ok::<_, HummockError>(())
667 };
668 tasks.push(task);
669 }
670
671 try_join_all(tasks).await?;
672
673 Ok(())
674 }
675}
676
677#[derive(Debug)]
678pub struct SstableBlock {
679 pub sst_obj_id: HummockSstableObjectId,
680 pub blk_idx: usize,
681}
682
683#[derive(Debug, Hash, PartialEq, Eq)]
684pub struct SstableUnit {
685 pub sst_obj_id: HummockSstableObjectId,
686 pub blks: Range<usize>,
687}
688
689impl Ord for SstableUnit {
690 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
691 match self.sst_obj_id.cmp(&other.sst_obj_id) {
692 std::cmp::Ordering::Equal => {}
693 ord => return ord,
694 }
695 match self.blks.start.cmp(&other.blks.start) {
696 std::cmp::Ordering::Equal => {}
697 ord => return ord,
698 }
699 self.blks.end.cmp(&other.blks.end)
700 }
701}
702
703impl PartialOrd for SstableUnit {
704 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
705 Some(self.cmp(other))
706 }
707}
708
709#[derive(Debug)]
710struct Unit<'a> {
711 sst: &'a Sstable,
712 blks: Range<usize>,
713}
714
715impl<'a> Unit<'a> {
716 fn new(sst: &'a Sstable, unit: usize, uidx: usize) -> Self {
717 let blks = unit * uidx..std::cmp::min(unit * (uidx + 1), sst.block_count());
718 Self { sst, blks }
719 }
720
721 fn smallest_key(&self) -> &Vec<u8> {
722 &self.sst.meta.block_metas[self.blks.start].smallest_key
723 }
724
725 fn largest_key(&self) -> &Vec<u8> {
727 if self.blks.end == self.sst.block_count() {
728 &self.sst.meta.largest_key
729 } else {
730 &self.sst.meta.block_metas[self.blks.end].smallest_key
731 }
732 }
733
734 fn units(sst: &Sstable, unit: usize) -> usize {
735 sst.block_count() / unit + if sst.block_count() % unit == 0 { 0 } else { 1 }
736 }
737}