1use std::collections::{HashMap, HashSet, VecDeque};
16use std::future::poll_fn;
17use std::ops::Range;
18use std::sync::{Arc, LazyLock};
19use std::task::{Poll, ready};
20use std::time::{Duration, Instant};
21
22use foyer::{HybridCacheEntry, RangeBoundsExt};
23use futures::future::{join_all, try_join_all};
24use futures::{Future, FutureExt};
25use itertools::Itertools;
26use prometheus::core::{AtomicU64, GenericCounter, GenericCounterVec};
27use prometheus::{
28 Histogram, HistogramVec, IntGauge, Registry, register_histogram_vec_with_registry,
29 register_int_counter_vec_with_registry, register_int_gauge_with_registry,
30};
31use risingwave_common::license::Feature;
32use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
33use risingwave_hummock_sdk::compaction_group::hummock_version_ext::SstDeltaInfo;
34use risingwave_hummock_sdk::{HummockSstableObjectId, KeyComparator};
35use thiserror_ext::AsReport;
36use tokio::sync::Semaphore;
37use tokio::task::JoinHandle;
38
39use crate::hummock::local_version::pinned_version::PinnedVersion;
40use crate::hummock::{
41 Block, HummockError, HummockResult, RecentFilterTrait, Sstable, SstableBlockIndex,
42 SstableStoreRef, TableHolder,
43};
44use crate::monitor::StoreLocalStatistic;
45use crate::opts::StorageOpts;
46
47pub static GLOBAL_CACHE_REFILL_METRICS: LazyLock<CacheRefillMetrics> =
48 LazyLock::new(|| CacheRefillMetrics::new(&GLOBAL_METRICS_REGISTRY));
49
50pub struct CacheRefillMetrics {
51 pub refill_duration: HistogramVec,
52 pub refill_total: GenericCounterVec<AtomicU64>,
53 pub refill_bytes: GenericCounterVec<AtomicU64>,
54
55 pub data_refill_success_duration: Histogram,
56 pub meta_refill_success_duration: Histogram,
57
58 pub data_refill_filtered_total: GenericCounter<AtomicU64>,
59 pub data_refill_attempts_total: GenericCounter<AtomicU64>,
60 pub data_refill_started_total: GenericCounter<AtomicU64>,
61 pub meta_refill_attempts_total: GenericCounter<AtomicU64>,
62
63 pub data_refill_parent_meta_lookup_hit_total: GenericCounter<AtomicU64>,
64 pub data_refill_parent_meta_lookup_miss_total: GenericCounter<AtomicU64>,
65 pub data_refill_unit_inheritance_hit_total: GenericCounter<AtomicU64>,
66 pub data_refill_unit_inheritance_miss_total: GenericCounter<AtomicU64>,
67
68 pub data_refill_block_unfiltered_total: GenericCounter<AtomicU64>,
69 pub data_refill_block_success_total: GenericCounter<AtomicU64>,
70
71 pub data_refill_ideal_bytes: GenericCounter<AtomicU64>,
72 pub data_refill_success_bytes: GenericCounter<AtomicU64>,
73
74 pub refill_queue_total: IntGauge,
75}
76
77impl CacheRefillMetrics {
78 pub fn new(registry: &Registry) -> Self {
79 let refill_duration = register_histogram_vec_with_registry!(
80 "refill_duration",
81 "refill duration",
82 &["type", "op"],
83 registry,
84 )
85 .unwrap();
86 let refill_total = register_int_counter_vec_with_registry!(
87 "refill_total",
88 "refill total",
89 &["type", "op"],
90 registry,
91 )
92 .unwrap();
93 let refill_bytes = register_int_counter_vec_with_registry!(
94 "refill_bytes",
95 "refill bytes",
96 &["type", "op"],
97 registry,
98 )
99 .unwrap();
100
101 let data_refill_success_duration = refill_duration
102 .get_metric_with_label_values(&["data", "success"])
103 .unwrap();
104 let meta_refill_success_duration = refill_duration
105 .get_metric_with_label_values(&["meta", "success"])
106 .unwrap();
107
108 let data_refill_filtered_total = refill_total
109 .get_metric_with_label_values(&["data", "filtered"])
110 .unwrap();
111 let data_refill_attempts_total = refill_total
112 .get_metric_with_label_values(&["data", "attempts"])
113 .unwrap();
114 let data_refill_started_total = refill_total
115 .get_metric_with_label_values(&["data", "started"])
116 .unwrap();
117 let meta_refill_attempts_total = refill_total
118 .get_metric_with_label_values(&["meta", "attempts"])
119 .unwrap();
120
121 let data_refill_parent_meta_lookup_hit_total = refill_total
122 .get_metric_with_label_values(&["parent_meta", "hit"])
123 .unwrap();
124 let data_refill_parent_meta_lookup_miss_total = refill_total
125 .get_metric_with_label_values(&["parent_meta", "miss"])
126 .unwrap();
127 let data_refill_unit_inheritance_hit_total = refill_total
128 .get_metric_with_label_values(&["unit_inheritance", "hit"])
129 .unwrap();
130 let data_refill_unit_inheritance_miss_total = refill_total
131 .get_metric_with_label_values(&["unit_inheritance", "miss"])
132 .unwrap();
133
134 let data_refill_block_unfiltered_total = refill_total
135 .get_metric_with_label_values(&["block", "unfiltered"])
136 .unwrap();
137 let data_refill_block_success_total = refill_total
138 .get_metric_with_label_values(&["block", "success"])
139 .unwrap();
140
141 let data_refill_ideal_bytes = refill_bytes
142 .get_metric_with_label_values(&["data", "ideal"])
143 .unwrap();
144 let data_refill_success_bytes = refill_bytes
145 .get_metric_with_label_values(&["data", "success"])
146 .unwrap();
147
148 let refill_queue_total = register_int_gauge_with_registry!(
149 "refill_queue_total",
150 "refill queue total",
151 registry,
152 )
153 .unwrap();
154
155 Self {
156 refill_duration,
157 refill_total,
158 refill_bytes,
159
160 data_refill_success_duration,
161 meta_refill_success_duration,
162 data_refill_filtered_total,
163 data_refill_attempts_total,
164 data_refill_started_total,
165 meta_refill_attempts_total,
166
167 data_refill_parent_meta_lookup_hit_total,
168 data_refill_parent_meta_lookup_miss_total,
169 data_refill_unit_inheritance_hit_total,
170 data_refill_unit_inheritance_miss_total,
171
172 data_refill_block_unfiltered_total,
173 data_refill_block_success_total,
174
175 data_refill_ideal_bytes,
176 data_refill_success_bytes,
177
178 refill_queue_total,
179 }
180 }
181}
182
183#[derive(Debug)]
184pub struct CacheRefillConfig {
185 pub timeout: Duration,
187
188 pub data_refill_levels: HashSet<u32>,
190
191 pub concurrency: usize,
193
194 pub unit: usize,
196
197 pub threshold: f64,
201
202 pub skip_recent_filter: bool,
204}
205
206impl CacheRefillConfig {
207 pub fn from_storage_opts(options: &StorageOpts) -> Self {
208 let data_refill_levels = match Feature::ElasticDiskCache.check_available() {
209 Ok(_) => options
210 .cache_refill_data_refill_levels
211 .iter()
212 .copied()
213 .collect(),
214 Err(e) => {
215 tracing::warn!(error = %e.as_report(), "ElasticDiskCache is not available.");
216 HashSet::new()
217 }
218 };
219
220 Self {
221 timeout: Duration::from_millis(options.cache_refill_timeout_ms),
222 data_refill_levels,
223 concurrency: options.cache_refill_concurrency,
224 unit: options.cache_refill_unit,
225 threshold: options.cache_refill_threshold,
226 skip_recent_filter: options.cache_refill_skip_recent_filter,
227 }
228 }
229}
230
231struct Item {
232 handle: JoinHandle<()>,
233 event: CacheRefillerEvent,
234}
235
236pub(crate) type SpawnRefillTask = Arc<
237 dyn Fn(Vec<SstDeltaInfo>, CacheRefillContext, PinnedVersion, PinnedVersion) -> JoinHandle<()>
239 + Send
240 + Sync
241 + 'static,
242>;
243
244pub(crate) struct CacheRefiller {
246 queue: VecDeque<Item>,
248
249 context: CacheRefillContext,
250
251 spawn_refill_task: SpawnRefillTask,
252}
253
254impl CacheRefiller {
255 pub(crate) fn new(
256 config: CacheRefillConfig,
257 sstable_store: SstableStoreRef,
258 spawn_refill_task: SpawnRefillTask,
259 ) -> Self {
260 let config = Arc::new(config);
261 let concurrency = Arc::new(Semaphore::new(config.concurrency));
262 Self {
263 queue: VecDeque::new(),
264 context: CacheRefillContext {
265 config,
266 concurrency,
267 sstable_store,
268 },
269 spawn_refill_task,
270 }
271 }
272
273 pub(crate) fn default_spawn_refill_task() -> SpawnRefillTask {
274 Arc::new(|deltas, context, _, _| {
275 let task = CacheRefillTask { deltas, context };
276 tokio::spawn(task.run())
277 })
278 }
279
280 pub(crate) fn start_cache_refill(
281 &mut self,
282 deltas: Vec<SstDeltaInfo>,
283 pinned_version: PinnedVersion,
284 new_pinned_version: PinnedVersion,
285 ) {
286 let handle = (self.spawn_refill_task)(
287 deltas,
288 self.context.clone(),
289 pinned_version.clone(),
290 new_pinned_version.clone(),
291 );
292 let event = CacheRefillerEvent {
293 pinned_version,
294 new_pinned_version,
295 };
296 let item = Item { handle, event };
297 self.queue.push_back(item);
298 GLOBAL_CACHE_REFILL_METRICS.refill_queue_total.add(1);
299 }
300
301 pub(crate) fn last_new_pinned_version(&self) -> Option<&PinnedVersion> {
302 self.queue.back().map(|item| &item.event.new_pinned_version)
303 }
304}
305
306impl CacheRefiller {
307 pub(crate) fn next_event(&mut self) -> impl Future<Output = CacheRefillerEvent> + '_ {
308 poll_fn(|cx| {
309 if let Some(item) = self.queue.front_mut() {
310 ready!(item.handle.poll_unpin(cx)).unwrap();
311 let item = self.queue.pop_front().unwrap();
312 GLOBAL_CACHE_REFILL_METRICS.refill_queue_total.sub(1);
313 return Poll::Ready(item.event);
314 }
315 Poll::Pending
316 })
317 }
318}
319
320pub struct CacheRefillerEvent {
321 pub pinned_version: PinnedVersion,
322 pub new_pinned_version: PinnedVersion,
323}
324
325#[derive(Clone)]
326pub(crate) struct CacheRefillContext {
327 config: Arc<CacheRefillConfig>,
328 concurrency: Arc<Semaphore>,
329 sstable_store: SstableStoreRef,
330}
331
332struct CacheRefillTask {
333 deltas: Vec<SstDeltaInfo>,
334 context: CacheRefillContext,
335}
336
337impl CacheRefillTask {
338 async fn run(self) {
339 let tasks = self
340 .deltas
341 .iter()
342 .map(|delta| {
343 let context = self.context.clone();
344 async move {
345 let holders = match Self::meta_cache_refill(&context, delta).await {
346 Ok(holders) => holders,
347 Err(e) => {
348 tracing::warn!(error = %e.as_report(), "meta cache refill error");
349 return;
350 }
351 };
352 Self::data_cache_refill(&context, delta, holders).await;
353 }
354 })
355 .collect_vec();
356 let future = join_all(tasks);
357
358 let _ = tokio::time::timeout(self.context.config.timeout, future).await;
359 }
360
361 async fn meta_cache_refill(
362 context: &CacheRefillContext,
363 delta: &SstDeltaInfo,
364 ) -> HummockResult<Vec<TableHolder>> {
365 let tasks = delta
366 .insert_sst_infos
367 .iter()
368 .map(|info| async {
369 let mut stats = StoreLocalStatistic::default();
370 GLOBAL_CACHE_REFILL_METRICS.meta_refill_attempts_total.inc();
371
372 let now = Instant::now();
373 let res = context.sstable_store.sstable(info, &mut stats).await;
374 stats.discard();
375 GLOBAL_CACHE_REFILL_METRICS
376 .meta_refill_success_duration
377 .observe(now.elapsed().as_secs_f64());
378 res
379 })
380 .collect_vec();
381 let holders = try_join_all(tasks).await?;
382 Ok(holders)
383 }
384
385 fn get_units_to_refill_by_inheritance(
387 context: &CacheRefillContext,
388 ssts: &[TableHolder],
389 parent_ssts: impl IntoIterator<Item = HybridCacheEntry<HummockSstableObjectId, Box<Sstable>>>,
390 ) -> HashSet<SstableUnit> {
391 let mut res = HashSet::default();
392
393 let recent_filter = context.sstable_store.recent_filter();
394
395 let units = {
396 let unit = context.config.unit;
397 ssts.iter()
398 .flat_map(|sst| {
399 let units = Unit::units(sst, unit);
400 (0..units).map(|uidx| Unit::new(sst, unit, uidx))
401 })
402 .collect_vec()
403 };
404
405 if cfg!(debug_assertions) {
406 units.iter().tuple_windows().for_each(|(a, b)| {
408 debug_assert_ne!(
409 KeyComparator::compare_encoded_full_key(a.largest_key(), b.smallest_key()),
410 std::cmp::Ordering::Greater
411 )
412 });
413 }
414
415 for psst in parent_ssts {
416 for pblk in 0..psst.block_count() {
417 let pleft = &psst.meta.block_metas[pblk].smallest_key;
418 let pright = if pblk + 1 == psst.block_count() {
419 &psst.meta.largest_key
421 } else {
422 &psst.meta.block_metas[pblk + 1].smallest_key
423 };
424
425 let uleft = units.partition_point(|unit| {
427 KeyComparator::compare_encoded_full_key(unit.largest_key(), pleft)
428 == std::cmp::Ordering::Less
429 });
430 let uright = units.partition_point(|unit| {
432 KeyComparator::compare_encoded_full_key(unit.smallest_key(), pright)
433 != std::cmp::Ordering::Greater
434 });
435
436 for u in units.iter().take(uright).skip(uleft) {
438 let unit = SstableUnit {
439 sst_obj_id: u.sst.id,
440 blks: u.blks.clone(),
441 };
442 if res.contains(&unit) {
443 continue;
444 }
445 if context.config.skip_recent_filter || recent_filter.contains(&(psst.id, pblk))
446 {
447 res.insert(unit);
448 }
449 }
450 }
451 }
452
453 let hit = res.len();
454 let miss = units.len() - res.len();
455 GLOBAL_CACHE_REFILL_METRICS
456 .data_refill_unit_inheritance_hit_total
457 .inc_by(hit as u64);
458 GLOBAL_CACHE_REFILL_METRICS
459 .data_refill_unit_inheritance_miss_total
460 .inc_by(miss as u64);
461
462 res
463 }
464
465 async fn data_cache_refill(
467 context: &CacheRefillContext,
468 delta: &SstDeltaInfo,
469 holders: Vec<TableHolder>,
470 ) {
471 if !context.sstable_store.block_cache().is_hybrid() {
473 return;
474 }
475
476 if delta.insert_sst_infos.is_empty() || delta.delete_sst_object_ids.is_empty() {
478 return;
479 }
480
481 if !context
483 .config
484 .data_refill_levels
485 .contains(&delta.insert_sst_level)
486 {
487 return;
488 }
489
490 let recent_filter = context.sstable_store.recent_filter();
491
492 let targets = delta
494 .delete_sst_object_ids
495 .iter()
496 .map(|id| (*id, usize::MAX))
497 .collect_vec();
498 if !context.config.skip_recent_filter && !recent_filter.contains_any(targets.iter()) {
499 GLOBAL_CACHE_REFILL_METRICS
500 .data_refill_filtered_total
501 .inc_by(delta.delete_sst_object_ids.len() as _);
502 return;
503 }
504
505 GLOBAL_CACHE_REFILL_METRICS
506 .data_refill_block_unfiltered_total
507 .inc_by(
508 holders
509 .iter()
510 .map(|sst| sst.block_count() as u64)
511 .sum::<u64>(),
512 );
513
514 if delta.insert_sst_level == 0 || context.config.skip_recent_filter {
515 Self::data_file_cache_refill_full_impl(context, delta, holders).await;
516 } else {
517 Self::data_file_cache_impl(context, delta, holders).await;
518 }
519 }
520
521 async fn data_file_cache_refill_full_impl(
522 context: &CacheRefillContext,
523 _delta: &SstDeltaInfo,
524 holders: Vec<TableHolder>,
525 ) {
526 let unit = context.config.unit;
527
528 let mut futures = vec![];
529
530 for sst in &holders {
531 for blk_start in (0..sst.block_count()).step_by(unit) {
532 let blk_end = std::cmp::min(sst.block_count(), blk_start + unit);
533 let unit = SstableUnit {
534 sst_obj_id: sst.id,
535 blks: blk_start..blk_end,
536 };
537 futures.push(
538 async move { Self::data_file_cache_refill_unit(context, sst, unit).await },
539 );
540 }
541 }
542 join_all(futures).await;
543 }
544
545 async fn data_file_cache_impl(
546 context: &CacheRefillContext,
547 delta: &SstDeltaInfo,
548 holders: Vec<TableHolder>,
549 ) {
550 let sstable_store = context.sstable_store.clone();
551 let futures = delta.delete_sst_object_ids.iter().map(|sst_obj_id| {
552 let store = &sstable_store;
553 async move {
554 let res = store.sstable_cached(*sst_obj_id).await;
555 match res {
556 Ok(Some(_)) => GLOBAL_CACHE_REFILL_METRICS
557 .data_refill_parent_meta_lookup_hit_total
558 .inc(),
559 Ok(None) => GLOBAL_CACHE_REFILL_METRICS
560 .data_refill_parent_meta_lookup_miss_total
561 .inc(),
562 _ => {}
563 }
564 res
565 }
566 });
567 let parent_ssts = match try_join_all(futures).await {
568 Ok(parent_ssts) => parent_ssts.into_iter().flatten(),
569 Err(e) => {
570 return tracing::error!(error = %e.as_report(), "get old meta from cache error");
571 }
572 };
573 let units = Self::get_units_to_refill_by_inheritance(context, &holders, parent_ssts);
574
575 let ssts: HashMap<HummockSstableObjectId, TableHolder> =
576 holders.into_iter().map(|meta| (meta.id, meta)).collect();
577 let futures = units.into_iter().map(|unit| {
578 let ssts = &ssts;
579 async move {
580 let sst = ssts.get(&unit.sst_obj_id).unwrap();
581 if let Err(e) = Self::data_file_cache_refill_unit(context, sst, unit).await {
582 tracing::error!(error = %e.as_report(), "data file cache unit refill error");
583 }
584 }
585 });
586 join_all(futures).await;
587 }
588
589 async fn data_file_cache_refill_unit(
590 context: &CacheRefillContext,
591 sst: &Sstable,
592 unit: SstableUnit,
593 ) -> HummockResult<()> {
594 let sstable_store = &context.sstable_store;
595 let threshold = context.config.threshold;
596 let recent_filter = sstable_store.recent_filter();
597
598 recent_filter.insert((sst.id, usize::MAX));
600
601 let blocks = unit.blks.size().unwrap();
602
603 let mut tasks = vec![];
604 let mut contexts = Vec::with_capacity(blocks);
605 let mut admits = 0;
606
607 let (range_first, _) = sst.calculate_block_info(unit.blks.start);
608 let (range_last, _) = sst.calculate_block_info(unit.blks.end - 1);
609 let range = range_first.start..range_last.end;
610
611 let size = range.size().unwrap();
612
613 GLOBAL_CACHE_REFILL_METRICS
614 .data_refill_ideal_bytes
615 .inc_by(size as _);
616
617 for blk in unit.blks {
618 let (range, uncompressed_capacity) = sst.calculate_block_info(blk);
619 let key = SstableBlockIndex {
620 sst_id: sst.id,
621 block_idx: blk as u64,
622 };
623
624 let mut writer = sstable_store.block_cache().storage_writer(key);
625
626 if writer.filter(size).is_admitted() {
627 admits += 1;
628 }
629
630 contexts.push((writer, range, uncompressed_capacity))
631 }
632
633 if admits as f64 / contexts.len() as f64 >= threshold {
634 let task = async move {
635 GLOBAL_CACHE_REFILL_METRICS.data_refill_attempts_total.inc();
636
637 let permit = context.concurrency.acquire().await.unwrap();
638
639 GLOBAL_CACHE_REFILL_METRICS.data_refill_started_total.inc();
640
641 let timer = GLOBAL_CACHE_REFILL_METRICS
642 .data_refill_success_duration
643 .start_timer();
644
645 let data = sstable_store
646 .store()
647 .read(&sstable_store.get_sst_data_path(sst.id), range.clone())
648 .await?;
649 let mut futures = vec![];
650 for (w, r, uc) in contexts {
651 let offset = r.start - range.start;
652 let len = r.end - r.start;
653 let bytes = data.slice(offset..offset + len);
654 let future = async move {
655 let value = Box::new(Block::decode(bytes, uc)?);
656 if let Some(_entry) = w.force().insert(value) {
658 GLOBAL_CACHE_REFILL_METRICS
659 .data_refill_success_bytes
660 .inc_by(len as u64);
661 GLOBAL_CACHE_REFILL_METRICS
662 .data_refill_block_success_total
663 .inc();
664 }
665 Ok::<_, HummockError>(())
666 };
667 futures.push(future);
668 }
669 try_join_all(futures)
670 .await
671 .map_err(HummockError::file_cache)?;
672
673 drop(permit);
674 drop(timer);
675
676 Ok::<_, HummockError>(())
677 };
678 tasks.push(task);
679 }
680
681 try_join_all(tasks).await?;
682
683 Ok(())
684 }
685}
686
687#[derive(Debug)]
688pub struct SstableBlock {
689 pub sst_obj_id: HummockSstableObjectId,
690 pub blk_idx: usize,
691}
692
693#[derive(Debug, Hash, PartialEq, Eq)]
694pub struct SstableUnit {
695 pub sst_obj_id: HummockSstableObjectId,
696 pub blks: Range<usize>,
697}
698
699impl Ord for SstableUnit {
700 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
701 match self.sst_obj_id.cmp(&other.sst_obj_id) {
702 std::cmp::Ordering::Equal => {}
703 ord => return ord,
704 }
705 match self.blks.start.cmp(&other.blks.start) {
706 std::cmp::Ordering::Equal => {}
707 ord => return ord,
708 }
709 self.blks.end.cmp(&other.blks.end)
710 }
711}
712
713impl PartialOrd for SstableUnit {
714 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
715 Some(self.cmp(other))
716 }
717}
718
719#[derive(Debug)]
720struct Unit<'a> {
721 sst: &'a Sstable,
722 blks: Range<usize>,
723}
724
725impl<'a> Unit<'a> {
726 fn new(sst: &'a Sstable, unit: usize, uidx: usize) -> Self {
727 let blks = unit * uidx..std::cmp::min(unit * (uidx + 1), sst.block_count());
728 Self { sst, blks }
729 }
730
731 fn smallest_key(&self) -> &Vec<u8> {
732 &self.sst.meta.block_metas[self.blks.start].smallest_key
733 }
734
735 fn largest_key(&self) -> &Vec<u8> {
737 if self.blks.end == self.sst.block_count() {
738 &self.sst.meta.largest_key
739 } else {
740 &self.sst.meta.block_metas[self.blks.end].smallest_key
741 }
742 }
743
744 fn units(sst: &Sstable, unit: usize) -> usize {
745 sst.block_count() / unit + if sst.block_count() % unit == 0 { 0 } else { 1 }
746 }
747}