risingwave_storage/hummock/event_handler/
refiller.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet, VecDeque};
16use std::future::poll_fn;
17use std::ops::Range;
18use std::sync::{Arc, LazyLock};
19use std::task::Poll;
20use std::time::{Duration, Instant};
21
22use foyer::{HybridCacheEntry, RangeBoundsExt};
23use futures::future::{join_all, try_join_all};
24use futures::{Future, FutureExt};
25use itertools::Itertools;
26use prometheus::core::{AtomicU64, GenericCounter, GenericCounterVec};
27use prometheus::{
28    Histogram, HistogramVec, IntGauge, Registry, register_histogram_vec_with_registry,
29    register_int_counter_vec_with_registry, register_int_gauge_with_registry,
30};
31use risingwave_common::license::Feature;
32use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
33use risingwave_hummock_sdk::compaction_group::hummock_version_ext::SstDeltaInfo;
34use risingwave_hummock_sdk::{HummockSstableObjectId, KeyComparator};
35use thiserror_ext::AsReport;
36use tokio::sync::Semaphore;
37use tokio::task::JoinHandle;
38
39use crate::hummock::local_version::pinned_version::PinnedVersion;
40use crate::hummock::{
41    Block, HummockError, HummockResult, RecentFilterTrait, Sstable, SstableBlockIndex,
42    SstableStoreRef, TableHolder,
43};
44use crate::monitor::StoreLocalStatistic;
45use crate::opts::StorageOpts;
46
47pub static GLOBAL_CACHE_REFILL_METRICS: LazyLock<CacheRefillMetrics> =
48    LazyLock::new(|| CacheRefillMetrics::new(&GLOBAL_METRICS_REGISTRY));
49
50pub struct CacheRefillMetrics {
51    pub refill_duration: HistogramVec,
52    pub refill_total: GenericCounterVec<AtomicU64>,
53    pub refill_bytes: GenericCounterVec<AtomicU64>,
54
55    pub data_refill_success_duration: Histogram,
56    pub meta_refill_success_duration: Histogram,
57
58    pub data_refill_filtered_total: GenericCounter<AtomicU64>,
59    pub data_refill_attempts_total: GenericCounter<AtomicU64>,
60    pub data_refill_started_total: GenericCounter<AtomicU64>,
61    pub meta_refill_attempts_total: GenericCounter<AtomicU64>,
62
63    pub data_refill_parent_meta_lookup_hit_total: GenericCounter<AtomicU64>,
64    pub data_refill_parent_meta_lookup_miss_total: GenericCounter<AtomicU64>,
65    pub data_refill_unit_inheritance_hit_total: GenericCounter<AtomicU64>,
66    pub data_refill_unit_inheritance_miss_total: GenericCounter<AtomicU64>,
67
68    pub data_refill_block_unfiltered_total: GenericCounter<AtomicU64>,
69    pub data_refill_block_success_total: GenericCounter<AtomicU64>,
70
71    pub data_refill_ideal_bytes: GenericCounter<AtomicU64>,
72    pub data_refill_success_bytes: GenericCounter<AtomicU64>,
73
74    pub refill_queue_total: IntGauge,
75}
76
77impl CacheRefillMetrics {
78    pub fn new(registry: &Registry) -> Self {
79        let refill_duration = register_histogram_vec_with_registry!(
80            "refill_duration",
81            "refill duration",
82            &["type", "op"],
83            registry,
84        )
85        .unwrap();
86        let refill_total = register_int_counter_vec_with_registry!(
87            "refill_total",
88            "refill total",
89            &["type", "op"],
90            registry,
91        )
92        .unwrap();
93        let refill_bytes = register_int_counter_vec_with_registry!(
94            "refill_bytes",
95            "refill bytes",
96            &["type", "op"],
97            registry,
98        )
99        .unwrap();
100
101        let data_refill_success_duration = refill_duration
102            .get_metric_with_label_values(&["data", "success"])
103            .unwrap();
104        let meta_refill_success_duration = refill_duration
105            .get_metric_with_label_values(&["meta", "success"])
106            .unwrap();
107
108        let data_refill_filtered_total = refill_total
109            .get_metric_with_label_values(&["data", "filtered"])
110            .unwrap();
111        let data_refill_attempts_total = refill_total
112            .get_metric_with_label_values(&["data", "attempts"])
113            .unwrap();
114        let data_refill_started_total = refill_total
115            .get_metric_with_label_values(&["data", "started"])
116            .unwrap();
117        let meta_refill_attempts_total = refill_total
118            .get_metric_with_label_values(&["meta", "attempts"])
119            .unwrap();
120
121        let data_refill_parent_meta_lookup_hit_total = refill_total
122            .get_metric_with_label_values(&["parent_meta", "hit"])
123            .unwrap();
124        let data_refill_parent_meta_lookup_miss_total = refill_total
125            .get_metric_with_label_values(&["parent_meta", "miss"])
126            .unwrap();
127        let data_refill_unit_inheritance_hit_total = refill_total
128            .get_metric_with_label_values(&["unit_inheritance", "hit"])
129            .unwrap();
130        let data_refill_unit_inheritance_miss_total = refill_total
131            .get_metric_with_label_values(&["unit_inheritance", "miss"])
132            .unwrap();
133
134        let data_refill_block_unfiltered_total = refill_total
135            .get_metric_with_label_values(&["block", "unfiltered"])
136            .unwrap();
137        let data_refill_block_success_total = refill_total
138            .get_metric_with_label_values(&["block", "success"])
139            .unwrap();
140
141        let data_refill_ideal_bytes = refill_bytes
142            .get_metric_with_label_values(&["data", "ideal"])
143            .unwrap();
144        let data_refill_success_bytes = refill_bytes
145            .get_metric_with_label_values(&["data", "success"])
146            .unwrap();
147
148        let refill_queue_total = register_int_gauge_with_registry!(
149            "refill_queue_total",
150            "refill queue total",
151            registry,
152        )
153        .unwrap();
154
155        Self {
156            refill_duration,
157            refill_total,
158            refill_bytes,
159
160            data_refill_success_duration,
161            meta_refill_success_duration,
162            data_refill_filtered_total,
163            data_refill_attempts_total,
164            data_refill_started_total,
165            meta_refill_attempts_total,
166
167            data_refill_parent_meta_lookup_hit_total,
168            data_refill_parent_meta_lookup_miss_total,
169            data_refill_unit_inheritance_hit_total,
170            data_refill_unit_inheritance_miss_total,
171
172            data_refill_block_unfiltered_total,
173            data_refill_block_success_total,
174
175            data_refill_ideal_bytes,
176            data_refill_success_bytes,
177
178            refill_queue_total,
179        }
180    }
181}
182
183#[derive(Debug)]
184pub struct CacheRefillConfig {
185    /// Cache refill timeout.
186    pub timeout: Duration,
187
188    /// Data file cache refill levels.
189    pub data_refill_levels: HashSet<u32>,
190
191    /// Meta file cache refill concurrency.
192    pub meta_refill_concurrency: usize,
193
194    /// Data file cache refill concurrency.
195    pub concurrency: usize,
196
197    /// Data file cache refill unit (blocks).
198    pub unit: usize,
199
200    /// Data file cache reill unit threshold.
201    ///
202    /// Only units whose admit rate > threshold will be refilled.
203    pub threshold: f64,
204
205    /// Skip recent filter.
206    pub skip_recent_filter: bool,
207}
208
209impl CacheRefillConfig {
210    pub fn from_storage_opts(options: &StorageOpts) -> Self {
211        let data_refill_levels = match Feature::ElasticDiskCache.check_available() {
212            Ok(_) => options
213                .cache_refill_data_refill_levels
214                .iter()
215                .copied()
216                .collect(),
217            Err(e) => {
218                tracing::warn!(error = %e.as_report(), "ElasticDiskCache is not available.");
219                HashSet::new()
220            }
221        };
222
223        Self {
224            timeout: Duration::from_millis(options.cache_refill_timeout_ms),
225            data_refill_levels,
226            concurrency: options.cache_refill_concurrency,
227            meta_refill_concurrency: options.cache_refill_meta_refill_concurrency,
228            unit: options.cache_refill_unit,
229            threshold: options.cache_refill_threshold,
230            skip_recent_filter: options.cache_refill_skip_recent_filter,
231        }
232    }
233}
234
235struct Item {
236    handle: JoinHandle<()>,
237    event: CacheRefillerEvent,
238}
239
240pub(crate) type SpawnRefillTask = Arc<
241    // first current version, second new version
242    dyn Fn(Vec<SstDeltaInfo>, CacheRefillContext, PinnedVersion, PinnedVersion) -> JoinHandle<()>
243        + Send
244        + Sync
245        + 'static,
246>;
247
248/// A cache refiller for hummock data.
249pub(crate) struct CacheRefiller {
250    /// order: old => new
251    queue: VecDeque<Item>,
252
253    context: CacheRefillContext,
254
255    spawn_refill_task: SpawnRefillTask,
256}
257
258impl CacheRefiller {
259    pub(crate) fn new(
260        config: CacheRefillConfig,
261        sstable_store: SstableStoreRef,
262        spawn_refill_task: SpawnRefillTask,
263    ) -> Self {
264        let config = Arc::new(config);
265        let concurrency = Arc::new(Semaphore::new(config.concurrency));
266        let meta_refill_concurrency = if config.meta_refill_concurrency == 0 {
267            None
268        } else {
269            Some(Arc::new(Semaphore::new(config.meta_refill_concurrency)))
270        };
271        Self {
272            queue: VecDeque::new(),
273            context: CacheRefillContext {
274                config,
275                meta_refill_concurrency,
276                concurrency,
277                sstable_store,
278            },
279            spawn_refill_task,
280        }
281    }
282
283    pub(crate) fn default_spawn_refill_task() -> SpawnRefillTask {
284        Arc::new(|deltas, context, _, _| {
285            let task = CacheRefillTask { deltas, context };
286            tokio::spawn(task.run())
287        })
288    }
289
290    pub(crate) fn start_cache_refill(
291        &mut self,
292        deltas: Vec<SstDeltaInfo>,
293        pinned_version: PinnedVersion,
294        new_pinned_version: PinnedVersion,
295    ) {
296        let handle = (self.spawn_refill_task)(
297            deltas,
298            self.context.clone(),
299            pinned_version.clone(),
300            new_pinned_version.clone(),
301        );
302        let event = CacheRefillerEvent {
303            pinned_version,
304            new_pinned_version,
305        };
306        let item = Item { handle, event };
307        self.queue.push_back(item);
308        GLOBAL_CACHE_REFILL_METRICS.refill_queue_total.add(1);
309    }
310
311    pub(crate) fn last_new_pinned_version(&self) -> Option<&PinnedVersion> {
312        self.queue.back().map(|item| &item.event.new_pinned_version)
313    }
314}
315
316impl CacheRefiller {
317    pub(crate) fn next_events(&mut self) -> impl Future<Output = Vec<CacheRefillerEvent>> + '_ {
318        poll_fn(|cx| {
319            const MAX_BATCH_SIZE: usize = 16;
320            let mut events = None;
321            while let Some(item) = self.queue.front_mut()
322                && let Poll::Ready(result) = item.handle.poll_unpin(cx)
323            {
324                result.unwrap();
325                let item = self.queue.pop_front().unwrap();
326                GLOBAL_CACHE_REFILL_METRICS.refill_queue_total.sub(1);
327                let events = events.get_or_insert_with(|| Vec::with_capacity(MAX_BATCH_SIZE));
328                events.push(item.event);
329                if events.len() >= MAX_BATCH_SIZE {
330                    break;
331                }
332            }
333            if let Some(events) = events {
334                Poll::Ready(events)
335            } else {
336                Poll::Pending
337            }
338        })
339    }
340}
341
342pub struct CacheRefillerEvent {
343    pub pinned_version: PinnedVersion,
344    pub new_pinned_version: PinnedVersion,
345}
346
347#[derive(Clone)]
348pub(crate) struct CacheRefillContext {
349    config: Arc<CacheRefillConfig>,
350    meta_refill_concurrency: Option<Arc<Semaphore>>,
351    concurrency: Arc<Semaphore>,
352    sstable_store: SstableStoreRef,
353}
354
355struct CacheRefillTask {
356    deltas: Vec<SstDeltaInfo>,
357    context: CacheRefillContext,
358}
359
360impl CacheRefillTask {
361    async fn run(self) {
362        let tasks = self
363            .deltas
364            .iter()
365            .map(|delta| {
366                let context = self.context.clone();
367                async move {
368                    let holders = match Self::meta_cache_refill(&context, delta).await {
369                        Ok(holders) => holders,
370                        Err(e) => {
371                            tracing::warn!(error = %e.as_report(), "meta cache refill error");
372                            return;
373                        }
374                    };
375                    Self::data_cache_refill(&context, delta, holders).await;
376                }
377            })
378            .collect_vec();
379        let future = join_all(tasks);
380
381        let _ = tokio::time::timeout(self.context.config.timeout, future).await;
382    }
383
384    async fn meta_cache_refill(
385        context: &CacheRefillContext,
386        delta: &SstDeltaInfo,
387    ) -> HummockResult<Vec<TableHolder>> {
388        let tasks = delta
389            .insert_sst_infos
390            .iter()
391            .map(|info| async {
392                let mut stats = StoreLocalStatistic::default();
393                GLOBAL_CACHE_REFILL_METRICS.meta_refill_attempts_total.inc();
394
395                let permit = if let Some(c) = &context.meta_refill_concurrency {
396                    Some(c.acquire().await.unwrap())
397                } else {
398                    None
399                };
400
401                let now = Instant::now();
402                let res = context.sstable_store.sstable(info, &mut stats).await;
403                stats.discard();
404                GLOBAL_CACHE_REFILL_METRICS
405                    .meta_refill_success_duration
406                    .observe(now.elapsed().as_secs_f64());
407                drop(permit);
408
409                res
410            })
411            .collect_vec();
412        let holders = try_join_all(tasks).await?;
413        Ok(holders)
414    }
415
416    /// Get sstable inheritance info in unit level.
417    fn get_units_to_refill_by_inheritance(
418        context: &CacheRefillContext,
419        ssts: &[TableHolder],
420        parent_ssts: impl IntoIterator<Item = HybridCacheEntry<HummockSstableObjectId, Box<Sstable>>>,
421    ) -> HashSet<SstableUnit> {
422        let mut res = HashSet::default();
423
424        let recent_filter = context.sstable_store.recent_filter();
425
426        let units = {
427            let unit = context.config.unit;
428            ssts.iter()
429                .flat_map(|sst| {
430                    let units = Unit::units(sst, unit);
431                    (0..units).map(|uidx| Unit::new(sst, unit, uidx))
432                })
433                .collect_vec()
434        };
435
436        if cfg!(debug_assertions) {
437            // assert units in asc order
438            units.iter().tuple_windows().for_each(|(a, b)| {
439                debug_assert_ne!(
440                    KeyComparator::compare_encoded_full_key(a.largest_key(), b.smallest_key()),
441                    std::cmp::Ordering::Greater
442                )
443            });
444        }
445
446        for psst in parent_ssts {
447            for pblk in 0..psst.block_count() {
448                let pleft = &psst.meta.block_metas[pblk].smallest_key;
449                let pright = if pblk + 1 == psst.block_count() {
450                    // `largest_key` can be included or excluded, both are treated as included here
451                    &psst.meta.largest_key
452                } else {
453                    &psst.meta.block_metas[pblk + 1].smallest_key
454                };
455
456                // partition point: unit.right < pblk.left
457                let uleft = units.partition_point(|unit| {
458                    KeyComparator::compare_encoded_full_key(unit.largest_key(), pleft)
459                        == std::cmp::Ordering::Less
460                });
461                // partition point: unit.left <= pblk.right
462                let uright = units.partition_point(|unit| {
463                    KeyComparator::compare_encoded_full_key(unit.smallest_key(), pright)
464                        != std::cmp::Ordering::Greater
465                });
466
467                // overlapping: uleft..uright
468                for u in units.iter().take(uright).skip(uleft) {
469                    let unit = SstableUnit {
470                        sst_obj_id: u.sst.id,
471                        blks: u.blks.clone(),
472                    };
473                    if res.contains(&unit) {
474                        continue;
475                    }
476                    if context.config.skip_recent_filter || recent_filter.contains(&(psst.id, pblk))
477                    {
478                        res.insert(unit);
479                    }
480                }
481            }
482        }
483
484        let hit = res.len();
485        let miss = units.len() - res.len();
486        GLOBAL_CACHE_REFILL_METRICS
487            .data_refill_unit_inheritance_hit_total
488            .inc_by(hit as u64);
489        GLOBAL_CACHE_REFILL_METRICS
490            .data_refill_unit_inheritance_miss_total
491            .inc_by(miss as u64);
492
493        res
494    }
495
496    /// Data cache refill entry point.
497    async fn data_cache_refill(
498        context: &CacheRefillContext,
499        delta: &SstDeltaInfo,
500        holders: Vec<TableHolder>,
501    ) {
502        // Skip data cache refill if data disk cache is not enabled.
503        if !context.sstable_store.block_cache().is_hybrid() {
504            return;
505        }
506
507        // Return if no data to refill.
508        if delta.insert_sst_infos.is_empty() || delta.delete_sst_object_ids.is_empty() {
509            return;
510        }
511
512        // Return if the target level is not in the refill levels
513        if !context
514            .config
515            .data_refill_levels
516            .contains(&delta.insert_sst_level)
517        {
518            return;
519        }
520
521        let recent_filter = context.sstable_store.recent_filter();
522
523        // Return if recent filter is required and no deleted sst ids are in the recent filter.
524        let targets = delta
525            .delete_sst_object_ids
526            .iter()
527            .map(|id| (*id, usize::MAX))
528            .collect_vec();
529        if !context.config.skip_recent_filter && !recent_filter.contains_any(targets.iter()) {
530            GLOBAL_CACHE_REFILL_METRICS
531                .data_refill_filtered_total
532                .inc_by(delta.delete_sst_object_ids.len() as _);
533            return;
534        }
535
536        GLOBAL_CACHE_REFILL_METRICS
537            .data_refill_block_unfiltered_total
538            .inc_by(
539                holders
540                    .iter()
541                    .map(|sst| sst.block_count() as u64)
542                    .sum::<u64>(),
543            );
544
545        if delta.insert_sst_level == 0 || context.config.skip_recent_filter {
546            Self::data_file_cache_refill_full_impl(context, delta, holders).await;
547        } else {
548            Self::data_file_cache_impl(context, delta, holders).await;
549        }
550    }
551
552    async fn data_file_cache_refill_full_impl(
553        context: &CacheRefillContext,
554        _delta: &SstDeltaInfo,
555        holders: Vec<TableHolder>,
556    ) {
557        let unit = context.config.unit;
558
559        let mut futures = vec![];
560
561        for sst in &holders {
562            for blk_start in (0..sst.block_count()).step_by(unit) {
563                let blk_end = std::cmp::min(sst.block_count(), blk_start + unit);
564                let unit = SstableUnit {
565                    sst_obj_id: sst.id,
566                    blks: blk_start..blk_end,
567                };
568                futures.push(
569                    async move { Self::data_file_cache_refill_unit(context, sst, unit).await },
570                );
571            }
572        }
573        join_all(futures).await;
574    }
575
576    async fn data_file_cache_impl(
577        context: &CacheRefillContext,
578        delta: &SstDeltaInfo,
579        holders: Vec<TableHolder>,
580    ) {
581        let sstable_store = context.sstable_store.clone();
582        let futures = delta.delete_sst_object_ids.iter().map(|sst_obj_id| {
583            let store = &sstable_store;
584            async move {
585                let res = store.sstable_cached(*sst_obj_id).await;
586                match res {
587                    Ok(Some(_)) => GLOBAL_CACHE_REFILL_METRICS
588                        .data_refill_parent_meta_lookup_hit_total
589                        .inc(),
590                    Ok(None) => GLOBAL_CACHE_REFILL_METRICS
591                        .data_refill_parent_meta_lookup_miss_total
592                        .inc(),
593                    _ => {}
594                }
595                res
596            }
597        });
598        let parent_ssts = match try_join_all(futures).await {
599            Ok(parent_ssts) => parent_ssts.into_iter().flatten(),
600            Err(e) => {
601                return tracing::error!(error = %e.as_report(), "get old meta from cache error");
602            }
603        };
604        let units = Self::get_units_to_refill_by_inheritance(context, &holders, parent_ssts);
605
606        let ssts: HashMap<HummockSstableObjectId, TableHolder> =
607            holders.into_iter().map(|meta| (meta.id, meta)).collect();
608        let futures = units.into_iter().map(|unit| {
609            let ssts = &ssts;
610            async move {
611                let sst = ssts.get(&unit.sst_obj_id).unwrap();
612                if let Err(e) = Self::data_file_cache_refill_unit(context, sst, unit).await {
613                    tracing::error!(error = %e.as_report(), "data file cache unit refill error");
614                }
615            }
616        });
617        join_all(futures).await;
618    }
619
620    async fn data_file_cache_refill_unit(
621        context: &CacheRefillContext,
622        sst: &Sstable,
623        unit: SstableUnit,
624    ) -> HummockResult<()> {
625        let sstable_store = &context.sstable_store;
626        let threshold = context.config.threshold;
627        let recent_filter = sstable_store.recent_filter();
628
629        // update filter for sst id only
630        recent_filter.insert((sst.id, usize::MAX));
631
632        let blocks = unit.blks.size().unwrap();
633
634        let mut tasks = vec![];
635        let mut contexts = Vec::with_capacity(blocks);
636        let mut admits = 0;
637
638        let (range_first, _) = sst.calculate_block_info(unit.blks.start);
639        let (range_last, _) = sst.calculate_block_info(unit.blks.end - 1);
640        let range = range_first.start..range_last.end;
641
642        let size = range.size().unwrap();
643
644        GLOBAL_CACHE_REFILL_METRICS
645            .data_refill_ideal_bytes
646            .inc_by(size as _);
647
648        for blk in unit.blks {
649            let (range, uncompressed_capacity) = sst.calculate_block_info(blk);
650            let key = SstableBlockIndex {
651                sst_id: sst.id,
652                block_idx: blk as u64,
653            };
654
655            let mut writer = sstable_store.block_cache().storage_writer(key);
656
657            if writer.filter(size).is_admitted() {
658                admits += 1;
659            }
660
661            contexts.push((writer, range, uncompressed_capacity))
662        }
663
664        if admits as f64 / contexts.len() as f64 >= threshold {
665            let task = async move {
666                GLOBAL_CACHE_REFILL_METRICS.data_refill_attempts_total.inc();
667
668                let permit = context.concurrency.acquire().await.unwrap();
669
670                GLOBAL_CACHE_REFILL_METRICS.data_refill_started_total.inc();
671
672                let timer = GLOBAL_CACHE_REFILL_METRICS
673                    .data_refill_success_duration
674                    .start_timer();
675
676                let data = sstable_store
677                    .store()
678                    .read(&sstable_store.get_sst_data_path(sst.id), range.clone())
679                    .await?;
680                let mut futures = vec![];
681                for (w, r, uc) in contexts {
682                    let offset = r.start - range.start;
683                    let len = r.end - r.start;
684                    let bytes = data.slice(offset..offset + len);
685                    let future = async move {
686                        let value = Box::new(Block::decode(bytes, uc)?);
687                        // The entry should always be `Some(..)`, use if here for compatible.
688                        if let Some(_entry) = w.force().insert(value) {
689                            GLOBAL_CACHE_REFILL_METRICS
690                                .data_refill_success_bytes
691                                .inc_by(len as u64);
692                            GLOBAL_CACHE_REFILL_METRICS
693                                .data_refill_block_success_total
694                                .inc();
695                        }
696                        Ok::<_, HummockError>(())
697                    };
698                    futures.push(future);
699                }
700                try_join_all(futures)
701                    .await
702                    .map_err(HummockError::file_cache)?;
703
704                drop(permit);
705                drop(timer);
706
707                Ok::<_, HummockError>(())
708            };
709            tasks.push(task);
710        }
711
712        try_join_all(tasks).await?;
713
714        Ok(())
715    }
716}
717
718#[derive(Debug)]
719pub struct SstableBlock {
720    pub sst_obj_id: HummockSstableObjectId,
721    pub blk_idx: usize,
722}
723
724#[derive(Debug, Hash, PartialEq, Eq)]
725pub struct SstableUnit {
726    pub sst_obj_id: HummockSstableObjectId,
727    pub blks: Range<usize>,
728}
729
730impl Ord for SstableUnit {
731    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
732        match self.sst_obj_id.cmp(&other.sst_obj_id) {
733            std::cmp::Ordering::Equal => {}
734            ord => return ord,
735        }
736        match self.blks.start.cmp(&other.blks.start) {
737            std::cmp::Ordering::Equal => {}
738            ord => return ord,
739        }
740        self.blks.end.cmp(&other.blks.end)
741    }
742}
743
744impl PartialOrd for SstableUnit {
745    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
746        Some(self.cmp(other))
747    }
748}
749
750#[derive(Debug)]
751struct Unit<'a> {
752    sst: &'a Sstable,
753    blks: Range<usize>,
754}
755
756impl<'a> Unit<'a> {
757    fn new(sst: &'a Sstable, unit: usize, uidx: usize) -> Self {
758        let blks = unit * uidx..std::cmp::min(unit * (uidx + 1), sst.block_count());
759        Self { sst, blks }
760    }
761
762    fn smallest_key(&self) -> &Vec<u8> {
763        &self.sst.meta.block_metas[self.blks.start].smallest_key
764    }
765
766    // `largest_key` can be included or excluded, both are treated as included here
767    fn largest_key(&self) -> &Vec<u8> {
768        if self.blks.end == self.sst.block_count() {
769            &self.sst.meta.largest_key
770        } else {
771            &self.sst.meta.block_metas[self.blks.end].smallest_key
772        }
773    }
774
775    fn units(sst: &Sstable, unit: usize) -> usize {
776        sst.block_count() / unit
777            + if sst.block_count().is_multiple_of(unit) {
778                0
779            } else {
780                1
781            }
782    }
783}