risingwave_storage/hummock/event_handler/
refiller.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet, VecDeque};
16use std::future::poll_fn;
17use std::ops::Range;
18use std::sync::{Arc, LazyLock};
19use std::task::{Poll, ready};
20use std::time::{Duration, Instant};
21
22use foyer::{HybridCacheEntry, RangeBoundsExt};
23use futures::future::{join_all, try_join_all};
24use futures::{Future, FutureExt};
25use itertools::Itertools;
26use prometheus::core::{AtomicU64, GenericCounter, GenericCounterVec};
27use prometheus::{
28    Histogram, HistogramVec, IntGauge, Registry, register_histogram_vec_with_registry,
29    register_int_counter_vec_with_registry, register_int_gauge_with_registry,
30};
31use risingwave_common::license::Feature;
32use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
33use risingwave_hummock_sdk::compaction_group::hummock_version_ext::SstDeltaInfo;
34use risingwave_hummock_sdk::{HummockSstableObjectId, KeyComparator};
35use thiserror_ext::AsReport;
36use tokio::sync::Semaphore;
37use tokio::task::JoinHandle;
38
39use crate::hummock::local_version::pinned_version::PinnedVersion;
40use crate::hummock::{
41    Block, HummockError, HummockResult, Sstable, SstableBlockIndex, SstableStoreRef, TableHolder,
42};
43use crate::monitor::StoreLocalStatistic;
44use crate::opts::StorageOpts;
45
46pub static GLOBAL_CACHE_REFILL_METRICS: LazyLock<CacheRefillMetrics> =
47    LazyLock::new(|| CacheRefillMetrics::new(&GLOBAL_METRICS_REGISTRY));
48
49pub struct CacheRefillMetrics {
50    pub refill_duration: HistogramVec,
51    pub refill_total: GenericCounterVec<AtomicU64>,
52    pub refill_bytes: GenericCounterVec<AtomicU64>,
53
54    pub data_refill_success_duration: Histogram,
55    pub meta_refill_success_duration: Histogram,
56
57    pub data_refill_filtered_total: GenericCounter<AtomicU64>,
58    pub data_refill_attempts_total: GenericCounter<AtomicU64>,
59    pub data_refill_started_total: GenericCounter<AtomicU64>,
60    pub meta_refill_attempts_total: GenericCounter<AtomicU64>,
61
62    pub data_refill_parent_meta_lookup_hit_total: GenericCounter<AtomicU64>,
63    pub data_refill_parent_meta_lookup_miss_total: GenericCounter<AtomicU64>,
64    pub data_refill_unit_inheritance_hit_total: GenericCounter<AtomicU64>,
65    pub data_refill_unit_inheritance_miss_total: GenericCounter<AtomicU64>,
66
67    pub data_refill_block_unfiltered_total: GenericCounter<AtomicU64>,
68    pub data_refill_block_success_total: GenericCounter<AtomicU64>,
69
70    pub data_refill_ideal_bytes: GenericCounter<AtomicU64>,
71    pub data_refill_success_bytes: GenericCounter<AtomicU64>,
72
73    pub refill_queue_total: IntGauge,
74}
75
76impl CacheRefillMetrics {
77    pub fn new(registry: &Registry) -> Self {
78        let refill_duration = register_histogram_vec_with_registry!(
79            "refill_duration",
80            "refill duration",
81            &["type", "op"],
82            registry,
83        )
84        .unwrap();
85        let refill_total = register_int_counter_vec_with_registry!(
86            "refill_total",
87            "refill total",
88            &["type", "op"],
89            registry,
90        )
91        .unwrap();
92        let refill_bytes = register_int_counter_vec_with_registry!(
93            "refill_bytes",
94            "refill bytes",
95            &["type", "op"],
96            registry,
97        )
98        .unwrap();
99
100        let data_refill_success_duration = refill_duration
101            .get_metric_with_label_values(&["data", "success"])
102            .unwrap();
103        let meta_refill_success_duration = refill_duration
104            .get_metric_with_label_values(&["meta", "success"])
105            .unwrap();
106
107        let data_refill_filtered_total = refill_total
108            .get_metric_with_label_values(&["data", "filtered"])
109            .unwrap();
110        let data_refill_attempts_total = refill_total
111            .get_metric_with_label_values(&["data", "attempts"])
112            .unwrap();
113        let data_refill_started_total = refill_total
114            .get_metric_with_label_values(&["data", "started"])
115            .unwrap();
116        let meta_refill_attempts_total = refill_total
117            .get_metric_with_label_values(&["meta", "attempts"])
118            .unwrap();
119
120        let data_refill_parent_meta_lookup_hit_total = refill_total
121            .get_metric_with_label_values(&["parent_meta", "hit"])
122            .unwrap();
123        let data_refill_parent_meta_lookup_miss_total = refill_total
124            .get_metric_with_label_values(&["parent_meta", "miss"])
125            .unwrap();
126        let data_refill_unit_inheritance_hit_total = refill_total
127            .get_metric_with_label_values(&["unit_inheritance", "hit"])
128            .unwrap();
129        let data_refill_unit_inheritance_miss_total = refill_total
130            .get_metric_with_label_values(&["unit_inheritance", "miss"])
131            .unwrap();
132
133        let data_refill_block_unfiltered_total = refill_total
134            .get_metric_with_label_values(&["block", "unfiltered"])
135            .unwrap();
136        let data_refill_block_success_total = refill_total
137            .get_metric_with_label_values(&["block", "success"])
138            .unwrap();
139
140        let data_refill_ideal_bytes = refill_bytes
141            .get_metric_with_label_values(&["data", "ideal"])
142            .unwrap();
143        let data_refill_success_bytes = refill_bytes
144            .get_metric_with_label_values(&["data", "success"])
145            .unwrap();
146
147        let refill_queue_total = register_int_gauge_with_registry!(
148            "refill_queue_total",
149            "refill queue total",
150            registry,
151        )
152        .unwrap();
153
154        Self {
155            refill_duration,
156            refill_total,
157            refill_bytes,
158
159            data_refill_success_duration,
160            meta_refill_success_duration,
161            data_refill_filtered_total,
162            data_refill_attempts_total,
163            data_refill_started_total,
164            meta_refill_attempts_total,
165
166            data_refill_parent_meta_lookup_hit_total,
167            data_refill_parent_meta_lookup_miss_total,
168            data_refill_unit_inheritance_hit_total,
169            data_refill_unit_inheritance_miss_total,
170
171            data_refill_block_unfiltered_total,
172            data_refill_block_success_total,
173
174            data_refill_ideal_bytes,
175            data_refill_success_bytes,
176
177            refill_queue_total,
178        }
179    }
180}
181
182#[derive(Debug)]
183pub struct CacheRefillConfig {
184    /// Cache refill timeout.
185    pub timeout: Duration,
186
187    /// Data file cache refill levels.
188    pub data_refill_levels: HashSet<u32>,
189
190    /// Data file cache refill concurrency.
191    pub concurrency: usize,
192
193    /// Data file cache refill unit (blocks).
194    pub unit: usize,
195
196    /// Data file cache reill unit threshold.
197    ///
198    /// Only units whose admit rate > threshold will be refilled.
199    pub threshold: f64,
200}
201
202impl CacheRefillConfig {
203    pub fn from_storage_opts(options: &StorageOpts) -> Self {
204        let data_refill_levels = match Feature::ElasticDiskCache.check_available() {
205            Ok(_) => options
206                .cache_refill_data_refill_levels
207                .iter()
208                .copied()
209                .collect(),
210            Err(e) => {
211                tracing::warn!(error = %e.as_report(), "ElasticDiskCache is not available.");
212                HashSet::new()
213            }
214        };
215
216        Self {
217            timeout: Duration::from_millis(options.cache_refill_timeout_ms),
218            data_refill_levels,
219            concurrency: options.cache_refill_concurrency,
220            unit: options.cache_refill_unit,
221            threshold: options.cache_refill_threshold,
222        }
223    }
224}
225
226struct Item {
227    handle: JoinHandle<()>,
228    event: CacheRefillerEvent,
229}
230
231pub(crate) type SpawnRefillTask = Arc<
232    // first current version, second new version
233    dyn Fn(Vec<SstDeltaInfo>, CacheRefillContext, PinnedVersion, PinnedVersion) -> JoinHandle<()>
234        + Send
235        + Sync
236        + 'static,
237>;
238
239/// A cache refiller for hummock data.
240pub(crate) struct CacheRefiller {
241    /// order: old => new
242    queue: VecDeque<Item>,
243
244    context: CacheRefillContext,
245
246    spawn_refill_task: SpawnRefillTask,
247}
248
249impl CacheRefiller {
250    pub(crate) fn new(
251        config: CacheRefillConfig,
252        sstable_store: SstableStoreRef,
253        spawn_refill_task: SpawnRefillTask,
254    ) -> Self {
255        let config = Arc::new(config);
256        let concurrency = Arc::new(Semaphore::new(config.concurrency));
257        Self {
258            queue: VecDeque::new(),
259            context: CacheRefillContext {
260                config,
261                concurrency,
262                sstable_store,
263            },
264            spawn_refill_task,
265        }
266    }
267
268    pub(crate) fn default_spawn_refill_task() -> SpawnRefillTask {
269        Arc::new(|deltas, context, _, _| {
270            let task = CacheRefillTask { deltas, context };
271            tokio::spawn(task.run())
272        })
273    }
274
275    pub(crate) fn start_cache_refill(
276        &mut self,
277        deltas: Vec<SstDeltaInfo>,
278        pinned_version: PinnedVersion,
279        new_pinned_version: PinnedVersion,
280    ) {
281        let handle = (self.spawn_refill_task)(
282            deltas,
283            self.context.clone(),
284            pinned_version.clone(),
285            new_pinned_version.clone(),
286        );
287        let event = CacheRefillerEvent {
288            pinned_version,
289            new_pinned_version,
290        };
291        let item = Item { handle, event };
292        self.queue.push_back(item);
293        GLOBAL_CACHE_REFILL_METRICS.refill_queue_total.add(1);
294    }
295
296    pub(crate) fn last_new_pinned_version(&self) -> Option<&PinnedVersion> {
297        self.queue.back().map(|item| &item.event.new_pinned_version)
298    }
299}
300
301impl CacheRefiller {
302    pub(crate) fn next_event(&mut self) -> impl Future<Output = CacheRefillerEvent> + '_ {
303        poll_fn(|cx| {
304            if let Some(item) = self.queue.front_mut() {
305                ready!(item.handle.poll_unpin(cx)).unwrap();
306                let item = self.queue.pop_front().unwrap();
307                GLOBAL_CACHE_REFILL_METRICS.refill_queue_total.sub(1);
308                return Poll::Ready(item.event);
309            }
310            Poll::Pending
311        })
312    }
313}
314
315pub struct CacheRefillerEvent {
316    pub pinned_version: PinnedVersion,
317    pub new_pinned_version: PinnedVersion,
318}
319
320#[derive(Clone)]
321pub(crate) struct CacheRefillContext {
322    config: Arc<CacheRefillConfig>,
323    concurrency: Arc<Semaphore>,
324    sstable_store: SstableStoreRef,
325}
326
327struct CacheRefillTask {
328    deltas: Vec<SstDeltaInfo>,
329    context: CacheRefillContext,
330}
331
332impl CacheRefillTask {
333    async fn run(self) {
334        let tasks = self
335            .deltas
336            .iter()
337            .map(|delta| {
338                let context = self.context.clone();
339                async move {
340                    let holders = match Self::meta_cache_refill(&context, delta).await {
341                        Ok(holders) => holders,
342                        Err(e) => {
343                            tracing::warn!(error = %e.as_report(), "meta cache refill error");
344                            return;
345                        }
346                    };
347                    Self::data_cache_refill(&context, delta, holders).await;
348                }
349            })
350            .collect_vec();
351        let future = join_all(tasks);
352
353        let _ = tokio::time::timeout(self.context.config.timeout, future).await;
354    }
355
356    async fn meta_cache_refill(
357        context: &CacheRefillContext,
358        delta: &SstDeltaInfo,
359    ) -> HummockResult<Vec<TableHolder>> {
360        let tasks = delta
361            .insert_sst_infos
362            .iter()
363            .map(|info| async {
364                let mut stats = StoreLocalStatistic::default();
365                GLOBAL_CACHE_REFILL_METRICS.meta_refill_attempts_total.inc();
366
367                let now = Instant::now();
368                let res = context.sstable_store.sstable(info, &mut stats).await;
369                stats.discard();
370                GLOBAL_CACHE_REFILL_METRICS
371                    .meta_refill_success_duration
372                    .observe(now.elapsed().as_secs_f64());
373                res
374            })
375            .collect_vec();
376        let holders = try_join_all(tasks).await?;
377        Ok(holders)
378    }
379
380    /// Get sstable inheritance info in unit level.
381    fn get_units_to_refill_by_inheritance(
382        context: &CacheRefillContext,
383        ssts: &[TableHolder],
384        parent_ssts: impl IntoIterator<Item = HybridCacheEntry<HummockSstableObjectId, Box<Sstable>>>,
385    ) -> HashSet<SstableUnit> {
386        let mut res = HashSet::default();
387
388        let Some(filter) = context.sstable_store.data_recent_filter() else {
389            return res;
390        };
391
392        let units = {
393            let unit = context.config.unit;
394            ssts.iter()
395                .flat_map(|sst| {
396                    let units = Unit::units(sst, unit);
397                    (0..units).map(|uidx| Unit::new(sst, unit, uidx))
398                })
399                .collect_vec()
400        };
401
402        if cfg!(debug_assertions) {
403            // assert units in asc order
404            units.iter().tuple_windows().for_each(|(a, b)| {
405                debug_assert_ne!(
406                    KeyComparator::compare_encoded_full_key(a.largest_key(), b.smallest_key()),
407                    std::cmp::Ordering::Greater
408                )
409            });
410        }
411
412        for psst in parent_ssts {
413            for pblk in 0..psst.block_count() {
414                let pleft = &psst.meta.block_metas[pblk].smallest_key;
415                let pright = if pblk + 1 == psst.block_count() {
416                    // `largest_key` can be included or excluded, both are treated as included here
417                    &psst.meta.largest_key
418                } else {
419                    &psst.meta.block_metas[pblk + 1].smallest_key
420                };
421
422                // partition point: unit.right < pblk.left
423                let uleft = units.partition_point(|unit| {
424                    KeyComparator::compare_encoded_full_key(unit.largest_key(), pleft)
425                        == std::cmp::Ordering::Less
426                });
427                // partition point: unit.left <= pblk.right
428                let uright = units.partition_point(|unit| {
429                    KeyComparator::compare_encoded_full_key(unit.smallest_key(), pright)
430                        != std::cmp::Ordering::Greater
431                });
432
433                // overlapping: uleft..uright
434                for u in units.iter().take(uright).skip(uleft) {
435                    let unit = SstableUnit {
436                        sst_obj_id: u.sst.id,
437                        blks: u.blks.clone(),
438                    };
439                    if res.contains(&unit) {
440                        continue;
441                    }
442                    if filter.contains(&(psst.id, pblk)) {
443                        res.insert(unit);
444                    }
445                }
446            }
447        }
448
449        let hit = res.len();
450        let miss = units.len() - res.len();
451        GLOBAL_CACHE_REFILL_METRICS
452            .data_refill_unit_inheritance_hit_total
453            .inc_by(hit as u64);
454        GLOBAL_CACHE_REFILL_METRICS
455            .data_refill_unit_inheritance_miss_total
456            .inc_by(miss as u64);
457
458        res
459    }
460
461    /// Data cache refill entry point.
462    async fn data_cache_refill(
463        context: &CacheRefillContext,
464        delta: &SstDeltaInfo,
465        holders: Vec<TableHolder>,
466    ) {
467        // Skip data cache refill if data disk cache is not enabled.
468        if !context.sstable_store.block_cache().is_hybrid() {
469            return;
470        }
471
472        // return if no data to refill
473        if delta.insert_sst_infos.is_empty() || delta.delete_sst_object_ids.is_empty() {
474            return;
475        }
476
477        // return if data file cache is disabled
478        let Some(filter) = context.sstable_store.data_recent_filter() else {
479            return;
480        };
481
482        // return if recent filter miss
483        if !context
484            .config
485            .data_refill_levels
486            .contains(&delta.insert_sst_level)
487            || !delta
488                .delete_sst_object_ids
489                .iter()
490                .any(|&id| filter.contains(&(id, usize::MAX)))
491        {
492            GLOBAL_CACHE_REFILL_METRICS.data_refill_filtered_total.inc();
493            return;
494        }
495
496        GLOBAL_CACHE_REFILL_METRICS
497            .data_refill_block_unfiltered_total
498            .inc_by(
499                holders
500                    .iter()
501                    .map(|sst| sst.block_count() as u64)
502                    .sum::<u64>(),
503            );
504
505        if delta.insert_sst_level == 0 {
506            Self::data_file_cache_refill_l0_impl(context, delta, holders).await;
507        } else {
508            Self::data_file_cache_impl(context, delta, holders).await;
509        }
510    }
511
512    async fn data_file_cache_refill_l0_impl(
513        context: &CacheRefillContext,
514        _delta: &SstDeltaInfo,
515        holders: Vec<TableHolder>,
516    ) {
517        let unit = context.config.unit;
518
519        let mut futures = vec![];
520
521        for sst in &holders {
522            for blk_start in (0..sst.block_count()).step_by(unit) {
523                let blk_end = std::cmp::min(sst.block_count(), blk_start + unit);
524                let unit = SstableUnit {
525                    sst_obj_id: sst.id,
526                    blks: blk_start..blk_end,
527                };
528                futures.push(
529                    async move { Self::data_file_cache_refill_unit(context, sst, unit).await },
530                );
531            }
532        }
533        join_all(futures).await;
534    }
535
536    async fn data_file_cache_impl(
537        context: &CacheRefillContext,
538        delta: &SstDeltaInfo,
539        holders: Vec<TableHolder>,
540    ) {
541        let sstable_store = context.sstable_store.clone();
542        let futures = delta.delete_sst_object_ids.iter().map(|sst_obj_id| {
543            let store = &sstable_store;
544            async move {
545                let res = store.sstable_cached(*sst_obj_id).await;
546                match res {
547                    Ok(Some(_)) => GLOBAL_CACHE_REFILL_METRICS
548                        .data_refill_parent_meta_lookup_hit_total
549                        .inc(),
550                    Ok(None) => GLOBAL_CACHE_REFILL_METRICS
551                        .data_refill_parent_meta_lookup_miss_total
552                        .inc(),
553                    _ => {}
554                }
555                res
556            }
557        });
558        let parent_ssts = match try_join_all(futures).await {
559            Ok(parent_ssts) => parent_ssts.into_iter().flatten(),
560            Err(e) => {
561                return tracing::error!(error = %e.as_report(), "get old meta from cache error");
562            }
563        };
564        let units = Self::get_units_to_refill_by_inheritance(context, &holders, parent_ssts);
565
566        let ssts: HashMap<HummockSstableObjectId, TableHolder> =
567            holders.into_iter().map(|meta| (meta.id, meta)).collect();
568        let futures = units.into_iter().map(|unit| {
569            let ssts = &ssts;
570            async move {
571                let sst = ssts.get(&unit.sst_obj_id).unwrap();
572                if let Err(e) = Self::data_file_cache_refill_unit(context, sst, unit).await {
573                    tracing::error!(error = %e.as_report(), "data file cache unit refill error");
574                }
575            }
576        });
577        join_all(futures).await;
578    }
579
580    async fn data_file_cache_refill_unit(
581        context: &CacheRefillContext,
582        sst: &Sstable,
583        unit: SstableUnit,
584    ) -> HummockResult<()> {
585        let sstable_store = &context.sstable_store;
586        let threshold = context.config.threshold;
587
588        // update filter for sst id only
589        if let Some(filter) = sstable_store.data_recent_filter() {
590            filter.insert((sst.id, usize::MAX));
591        }
592
593        let blocks = unit.blks.size().unwrap();
594
595        let mut tasks = vec![];
596        let mut contexts = Vec::with_capacity(blocks);
597        let mut admits = 0;
598
599        let (range_first, _) = sst.calculate_block_info(unit.blks.start);
600        let (range_last, _) = sst.calculate_block_info(unit.blks.end - 1);
601        let range = range_first.start..range_last.end;
602
603        GLOBAL_CACHE_REFILL_METRICS
604            .data_refill_ideal_bytes
605            .inc_by(range.size().unwrap() as u64);
606
607        for blk in unit.blks {
608            let (range, uncompressed_capacity) = sst.calculate_block_info(blk);
609            let key = SstableBlockIndex {
610                sst_id: sst.id,
611                block_idx: blk as u64,
612            };
613
614            let mut writer = sstable_store.block_cache().storage_writer(key);
615
616            if writer.pick().admitted() {
617                admits += 1;
618            }
619
620            contexts.push((writer, range, uncompressed_capacity))
621        }
622
623        if admits as f64 / contexts.len() as f64 >= threshold {
624            let task = async move {
625                GLOBAL_CACHE_REFILL_METRICS.data_refill_attempts_total.inc();
626
627                let permit = context.concurrency.acquire().await.unwrap();
628
629                GLOBAL_CACHE_REFILL_METRICS.data_refill_started_total.inc();
630
631                let timer = GLOBAL_CACHE_REFILL_METRICS
632                    .data_refill_success_duration
633                    .start_timer();
634
635                let data = sstable_store
636                    .store()
637                    .read(&sstable_store.get_sst_data_path(sst.id), range.clone())
638                    .await?;
639                let mut futures = vec![];
640                for (w, r, uc) in contexts {
641                    let offset = r.start - range.start;
642                    let len = r.end - r.start;
643                    let bytes = data.slice(offset..offset + len);
644                    let future = async move {
645                        let value = Box::new(Block::decode(bytes, uc)?);
646                        // The entry should always be `Some(..)`, use if here for compatible.
647                        if let Some(_entry) = w.force().insert(value) {
648                            GLOBAL_CACHE_REFILL_METRICS
649                                .data_refill_success_bytes
650                                .inc_by(len as u64);
651                            GLOBAL_CACHE_REFILL_METRICS
652                                .data_refill_block_success_total
653                                .inc();
654                        }
655                        Ok::<_, HummockError>(())
656                    };
657                    futures.push(future);
658                }
659                try_join_all(futures)
660                    .await
661                    .map_err(HummockError::file_cache)?;
662
663                drop(permit);
664                drop(timer);
665
666                Ok::<_, HummockError>(())
667            };
668            tasks.push(task);
669        }
670
671        try_join_all(tasks).await?;
672
673        Ok(())
674    }
675}
676
677#[derive(Debug)]
678pub struct SstableBlock {
679    pub sst_obj_id: HummockSstableObjectId,
680    pub blk_idx: usize,
681}
682
683#[derive(Debug, Hash, PartialEq, Eq)]
684pub struct SstableUnit {
685    pub sst_obj_id: HummockSstableObjectId,
686    pub blks: Range<usize>,
687}
688
689impl Ord for SstableUnit {
690    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
691        match self.sst_obj_id.cmp(&other.sst_obj_id) {
692            std::cmp::Ordering::Equal => {}
693            ord => return ord,
694        }
695        match self.blks.start.cmp(&other.blks.start) {
696            std::cmp::Ordering::Equal => {}
697            ord => return ord,
698        }
699        self.blks.end.cmp(&other.blks.end)
700    }
701}
702
703impl PartialOrd for SstableUnit {
704    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
705        Some(self.cmp(other))
706    }
707}
708
709#[derive(Debug)]
710struct Unit<'a> {
711    sst: &'a Sstable,
712    blks: Range<usize>,
713}
714
715impl<'a> Unit<'a> {
716    fn new(sst: &'a Sstable, unit: usize, uidx: usize) -> Self {
717        let blks = unit * uidx..std::cmp::min(unit * (uidx + 1), sst.block_count());
718        Self { sst, blks }
719    }
720
721    fn smallest_key(&self) -> &Vec<u8> {
722        &self.sst.meta.block_metas[self.blks.start].smallest_key
723    }
724
725    // `largest_key` can be included or excluded, both are treated as included here
726    fn largest_key(&self) -> &Vec<u8> {
727        if self.blks.end == self.sst.block_count() {
728            &self.sst.meta.largest_key
729        } else {
730            &self.sst.meta.block_metas[self.blks.end].smallest_key
731        }
732    }
733
734    fn units(sst: &Sstable, unit: usize) -> usize {
735        sst.block_count() / unit + if sst.block_count() % unit == 0 { 0 } else { 1 }
736    }
737}