risingwave_storage/hummock/
test_utils.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::min;
16use std::collections::HashMap;
17use std::sync::Arc;
18
19use bytes::Bytes;
20use foyer::{
21    Hint, HybridCache, HybridCacheBuilder, StorageKey as HybridKey, StorageValue as HybridValue,
22};
23use futures::TryFutureExt;
24use itertools::Itertools;
25use risingwave_common::catalog::TableId;
26use risingwave_common::config::EvictionConfig;
27use risingwave_common::hash::VirtualNode;
28use risingwave_common::util::epoch::test_epoch;
29use risingwave_common::util::row_serde::OrderedRowSerde;
30use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange, UserKey};
31use risingwave_hummock_sdk::key_range::KeyRange;
32use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
33use risingwave_hummock_sdk::{
34    EpochWithGap, HummockEpoch, HummockReadEpoch, HummockSstableObjectId,
35};
36
37use super::iterator::test_utils::iterator_test_table_key_of;
38use super::{
39    DEFAULT_RESTART_INTERVAL, HummockResult, InMemWriter, SstableMeta, SstableWriterOptions,
40};
41use crate::StateStore;
42use crate::compaction_catalog_manager::{
43    CompactionCatalogAgent, FilterKeyExtractorImpl, FullKeyFilterKeyExtractor,
44};
45use crate::error::StorageResult;
46use crate::hummock::shared_buffer::shared_buffer_batch::{
47    SharedBufferBatch, SharedBufferItem, SharedBufferValue,
48};
49use crate::hummock::value::HummockValue;
50use crate::hummock::{
51    BlockedXor16FilterBuilder, CachePolicy, FilterBuilder, LruCache, Sstable, SstableBuilder,
52    SstableBuilderOptions, SstableStoreRef, SstableWriter, TableHolder, Xor16FilterBuilder,
53};
54use crate::monitor::StoreLocalStatistic;
55use crate::opts::StorageOpts;
56use crate::storage_value::StorageValue;
57use crate::store::*;
58
59pub fn default_opts_for_test() -> StorageOpts {
60    StorageOpts {
61        sstable_size_mb: 4,
62        block_size_kb: 64,
63        bloom_false_positive: 0.1,
64        share_buffers_sync_parallelism: 2,
65        share_buffer_compaction_worker_threads_number: 1,
66        shared_buffer_capacity_mb: 64,
67        data_directory: "hummock_001".to_owned(),
68        write_conflict_detection_enabled: true,
69        block_cache_capacity_mb: 64,
70        meta_cache_capacity_mb: 64,
71        block_cache_eviction_config: EvictionConfig::for_test(),
72        disable_remote_compactor: false,
73        share_buffer_upload_concurrency: 1,
74        compactor_memory_limit_mb: 64,
75        sstable_id_remote_fetch_number: 1,
76        vector_file_block_size_kb: 8,
77        ..Default::default()
78    }
79}
80
81pub fn gen_dummy_batch(n: u64) -> Vec<SharedBufferItem> {
82    vec![(
83        TableKey(Bytes::from(iterator_test_table_key_of(n as usize))),
84        SharedBufferValue::Insert(Bytes::copy_from_slice(&b"value1"[..])),
85    )]
86}
87
88pub fn gen_dummy_batch_several_keys(n: usize) -> Vec<(TableKey<Bytes>, StorageValue)> {
89    let mut kvs = vec![];
90    let v = Bytes::from(b"value1".to_vec().repeat(100));
91    for idx in 0..n {
92        kvs.push((
93            TableKey(Bytes::from(iterator_test_table_key_of(idx))),
94            StorageValue::new_put(v.clone()),
95        ));
96    }
97    kvs
98}
99
100pub fn gen_dummy_sst_info(
101    id: u64,
102    batches: Vec<SharedBufferBatch>,
103    table_id: TableId,
104    epoch: HummockEpoch,
105) -> SstableInfo {
106    let mut min_table_key: Vec<u8> = batches[0].start_table_key().to_vec();
107    let mut max_table_key: Vec<u8> = batches[0].end_table_key().to_vec();
108    let mut file_size = 0;
109    for batch in batches.iter().skip(1) {
110        if min_table_key.as_slice() > *batch.start_table_key() {
111            min_table_key = batch.start_table_key().to_vec();
112        }
113        if max_table_key.as_slice() < *batch.end_table_key() {
114            max_table_key = batch.end_table_key().to_vec();
115        }
116        file_size += batch.size() as u64;
117    }
118    SstableInfoInner {
119        object_id: id.into(),
120        sst_id: id.into(),
121        key_range: KeyRange {
122            left: Bytes::from(FullKey::for_test(table_id, min_table_key, epoch).encode()),
123            right: Bytes::from(FullKey::for_test(table_id, max_table_key, epoch).encode()),
124            right_exclusive: false,
125        },
126        file_size,
127        table_ids: vec![table_id],
128        uncompressed_file_size: file_size,
129        min_epoch: epoch,
130        max_epoch: epoch,
131        sst_size: file_size,
132        ..Default::default()
133    }
134    .into()
135}
136
137/// Number of keys in table generated in `generate_table`.
138pub const TEST_KEYS_COUNT: usize = 10000;
139
140pub fn default_builder_opt_for_test() -> SstableBuilderOptions {
141    SstableBuilderOptions {
142        capacity: 256 * (1 << 20), // 256MB
143        block_capacity: 4096,      // 4KB
144        restart_interval: DEFAULT_RESTART_INTERVAL,
145        bloom_false_positive: 0.1,
146        ..Default::default()
147    }
148}
149
150pub fn default_writer_opt_for_test() -> SstableWriterOptions {
151    SstableWriterOptions {
152        capacity_hint: None,
153        tracker: None,
154        policy: CachePolicy::Disable,
155    }
156}
157
158pub fn mock_sst_writer(opt: &SstableBuilderOptions) -> InMemWriter {
159    InMemWriter::from(opt)
160}
161
162/// Generates sstable data and metadata from given `kv_iter`
163pub async fn gen_test_sstable_data(
164    opts: SstableBuilderOptions,
165    kv_iter: impl Iterator<Item = (FullKey<Vec<u8>>, HummockValue<Vec<u8>>)>,
166) -> (Bytes, SstableMeta) {
167    let table_id_to_vnode = HashMap::from_iter(vec![(
168        TableId::default().as_raw_id(),
169        VirtualNode::COUNT_FOR_TEST,
170    )]);
171    let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
172    let mut b = SstableBuilder::for_test(
173        0,
174        mock_sst_writer(&opts),
175        opts,
176        table_id_to_vnode,
177        table_id_to_watermark_serde,
178    );
179    for (key, value) in kv_iter {
180        b.add_for_test(key.to_ref(), value.as_slice())
181            .await
182            .unwrap();
183    }
184    let output = b.finish().await.unwrap();
185    output.writer_output
186}
187
188/// Write the data and meta to `sstable_store`.
189pub async fn put_sst(
190    sst_object_id: u64,
191    data: Bytes,
192    mut meta: SstableMeta,
193    sstable_store: SstableStoreRef,
194    mut options: SstableWriterOptions,
195    table_ids: Vec<u32>,
196) -> HummockResult<SstableInfo> {
197    options.policy = CachePolicy::NotFill;
198    let mut writer = sstable_store
199        .clone()
200        .create_sst_writer(sst_object_id, options);
201    for block_meta in &meta.block_metas {
202        let offset = block_meta.offset as usize;
203        let end_offset = offset + block_meta.len as usize;
204        writer
205            .write_block(&data[offset..end_offset], block_meta)
206            .await?;
207    }
208
209    // dummy
210    let bloom_filter = {
211        let mut filter_builder = BlockedXor16FilterBuilder::new(100);
212        for _ in &meta.block_metas {
213            filter_builder.switch_block(None);
214        }
215
216        filter_builder.finish(None)
217    };
218
219    meta.meta_offset = writer.data_len() as u64;
220    meta.bloom_filter = bloom_filter;
221    let sst = SstableInfoInner {
222        object_id: sst_object_id.into(),
223        sst_id: sst_object_id.into(),
224        key_range: KeyRange {
225            left: Bytes::from(meta.smallest_key.clone()),
226            right: Bytes::from(meta.largest_key.clone()),
227            right_exclusive: false,
228        },
229        file_size: meta.estimated_size as u64,
230        meta_offset: meta.meta_offset,
231        uncompressed_file_size: meta.estimated_size as u64,
232        table_ids: table_ids.into_iter().map(Into::into).collect(),
233        ..Default::default()
234    }
235    .into();
236    let writer_output = writer.finish(meta).await?;
237    writer_output.await.unwrap()?;
238    Ok(sst)
239}
240
241/// Generates a test table from the given `kv_iter` and put the kv value to `sstable_store`
242pub async fn gen_test_sstable_impl<B: AsRef<[u8]> + Clone + Default + Eq, F: FilterBuilder>(
243    opts: SstableBuilderOptions,
244    object_id: u64,
245    kv_iter: impl IntoIterator<Item = (FullKey<B>, HummockValue<B>)>,
246    sstable_store: SstableStoreRef,
247    policy: CachePolicy,
248    table_id_to_vnode: HashMap<u32, usize>,
249    table_id_to_watermark_serde: HashMap<u32, Option<(OrderedRowSerde, OrderedRowSerde, usize)>>,
250) -> SstableInfo {
251    let writer_opts = SstableWriterOptions {
252        capacity_hint: None,
253        tracker: None,
254        policy,
255    };
256    let writer = sstable_store
257        .clone()
258        .create_sst_writer(object_id, writer_opts);
259
260    let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
261        FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor),
262        table_id_to_vnode
263            .into_iter()
264            .map(|(table_id, v)| (table_id.into(), v))
265            .collect(),
266        table_id_to_watermark_serde
267            .into_iter()
268            .map(|(table_id, v)| (table_id.into(), v))
269            .collect(),
270        HashMap::default(),
271    ));
272
273    let mut b = SstableBuilder::<_, F>::new(
274        object_id,
275        writer,
276        F::create(opts.bloom_false_positive, opts.capacity / 16),
277        opts,
278        compaction_catalog_agent_ref,
279        None,
280    );
281
282    let mut last_key = FullKey::<B>::default();
283    for (key, value) in kv_iter {
284        let is_new_user_key =
285            last_key.is_empty() || key.user_key.as_ref() != last_key.user_key.as_ref();
286        if is_new_user_key {
287            last_key = key.clone();
288        }
289
290        b.add(key.to_ref(), value.as_slice()).await.unwrap();
291    }
292    let output = b.finish().await.unwrap();
293    output.writer_output.await.unwrap().unwrap();
294    output.sst_info.sst_info
295}
296
297/// Generate a test table from the given `kv_iter` and put the kv value to `sstable_store`
298pub async fn gen_test_sstable<B: AsRef<[u8]> + Clone + Default + Eq>(
299    opts: SstableBuilderOptions,
300    object_id: u64,
301    kv_iter: impl Iterator<Item = (FullKey<B>, HummockValue<B>)>,
302    sstable_store: SstableStoreRef,
303) -> (TableHolder, SstableInfo) {
304    let table_id_to_vnode = HashMap::from_iter(vec![(
305        TableId::default().as_raw_id(),
306        VirtualNode::COUNT_FOR_TEST,
307    )]);
308
309    let table_id_to_watermark_serde =
310        HashMap::from_iter(vec![(TableId::default().as_raw_id(), None)]);
311
312    let sst_info = gen_test_sstable_impl::<_, Xor16FilterBuilder>(
313        opts,
314        object_id,
315        kv_iter,
316        sstable_store.clone(),
317        CachePolicy::NotFill,
318        table_id_to_vnode,
319        table_id_to_watermark_serde,
320    )
321    .await;
322
323    (
324        sstable_store
325            .sstable(&sst_info, &mut StoreLocalStatistic::default())
326            .await
327            .unwrap(),
328        sst_info,
329    )
330}
331
332pub async fn gen_test_sstable_with_table_ids<B: AsRef<[u8]> + Clone + Default + Eq>(
333    opts: SstableBuilderOptions,
334    object_id: u64,
335    kv_iter: impl Iterator<Item = (FullKey<B>, HummockValue<B>)>,
336    sstable_store: SstableStoreRef,
337    table_ids: Vec<u32>,
338) -> (TableHolder, SstableInfo) {
339    let table_id_to_vnode = table_ids
340        .iter()
341        .map(|table_id| (*table_id, VirtualNode::COUNT_FOR_TEST))
342        .collect();
343    let table_id_to_watermark_serde = table_ids.iter().map(|table_id| (*table_id, None)).collect();
344
345    let sst_info = gen_test_sstable_impl::<_, Xor16FilterBuilder>(
346        opts,
347        object_id,
348        kv_iter,
349        sstable_store.clone(),
350        CachePolicy::NotFill,
351        table_id_to_vnode,
352        table_id_to_watermark_serde,
353    )
354    .await;
355
356    (
357        sstable_store
358            .sstable(&sst_info, &mut StoreLocalStatistic::default())
359            .await
360            .unwrap(),
361        sst_info,
362    )
363}
364
365/// Generate a test table from the given `kv_iter` and put the kv value to `sstable_store`
366pub async fn gen_test_sstable_info<B: AsRef<[u8]> + Clone + Default + Eq>(
367    opts: SstableBuilderOptions,
368    object_id: u64,
369    kv_iter: impl IntoIterator<Item = (FullKey<B>, HummockValue<B>)>,
370    sstable_store: SstableStoreRef,
371) -> SstableInfo {
372    let table_id_to_vnode = HashMap::from_iter(vec![(
373        TableId::default().as_raw_id(),
374        VirtualNode::COUNT_FOR_TEST,
375    )]);
376
377    let table_id_to_watermark_serde =
378        HashMap::from_iter(vec![(TableId::default().as_raw_id(), None)]);
379
380    gen_test_sstable_impl::<_, BlockedXor16FilterBuilder>(
381        opts,
382        object_id,
383        kv_iter,
384        sstable_store,
385        CachePolicy::NotFill,
386        table_id_to_vnode,
387        table_id_to_watermark_serde,
388    )
389    .await
390}
391
392/// Generate a test table from the given `kv_iter` and put the kv value to `sstable_store`
393pub async fn gen_test_sstable_with_range_tombstone(
394    opts: SstableBuilderOptions,
395    object_id: u64,
396    kv_iter: impl Iterator<Item = (FullKey<Vec<u8>>, HummockValue<Vec<u8>>)>,
397    sstable_store: SstableStoreRef,
398) -> SstableInfo {
399    let table_id_to_vnode = HashMap::from_iter(vec![(
400        TableId::default().as_raw_id(),
401        VirtualNode::COUNT_FOR_TEST,
402    )]);
403
404    let table_id_to_watermark_serde =
405        HashMap::from_iter(vec![(TableId::default().as_raw_id(), None)]);
406
407    gen_test_sstable_impl::<_, Xor16FilterBuilder>(
408        opts,
409        object_id,
410        kv_iter,
411        sstable_store.clone(),
412        CachePolicy::Fill(Hint::Normal),
413        table_id_to_vnode,
414        table_id_to_watermark_serde,
415    )
416    .await
417}
418
419/// Generates a user key with table id 0 and the given `table_key`
420pub fn test_user_key(table_key: impl AsRef<[u8]>) -> UserKey<Vec<u8>> {
421    UserKey::for_test(TableId::default(), table_key.as_ref().to_vec())
422}
423
424/// Generates a user key with table id 0 and table key format of `key_test_{idx * 2}`
425pub fn test_user_key_of(idx: usize) -> UserKey<Vec<u8>> {
426    let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
427    table_key.extend_from_slice(format!("key_test_{:05}", idx * 2).as_bytes());
428    UserKey::for_test(TableId::default(), table_key)
429}
430
431/// Generates a full key with table id 0 and epoch 123. User key is created with `test_user_key_of`.
432pub fn test_key_of(idx: usize) -> FullKey<Vec<u8>> {
433    FullKey {
434        user_key: test_user_key_of(idx),
435        epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
436    }
437}
438
439/// The value of an index in the test table
440pub fn test_value_of(idx: usize) -> Vec<u8> {
441    "23332333"
442        .as_bytes()
443        .iter()
444        .cycle()
445        .cloned()
446        .take(idx % 100 + 1) // so that the table is not too big
447        .collect_vec()
448}
449
450/// Generates a test table used in almost all table-related tests. Developers may verify the
451/// correctness of their implementations by comparing the got value and the expected value
452/// generated by `test_key_of` and `test_value_of`.
453pub async fn gen_default_test_sstable(
454    opts: SstableBuilderOptions,
455    object_id: u64,
456    sstable_store: SstableStoreRef,
457) -> (TableHolder, SstableInfo) {
458    gen_test_sstable(
459        opts,
460        object_id,
461        (0..TEST_KEYS_COUNT).map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
462        sstable_store,
463    )
464    .await
465}
466
467pub async fn count_stream(mut i: impl StateStoreIter) -> usize {
468    let mut c: usize = 0;
469    while i.try_next().await.unwrap().is_some() {
470        c += 1
471    }
472    c
473}
474
475pub fn create_small_table_cache() -> Arc<LruCache<HummockSstableObjectId, Box<Sstable>>> {
476    Arc::new(LruCache::new(1, 4, 0))
477}
478
479pub async fn hybrid_cache_for_test<K, V>() -> HybridCache<K, V>
480where
481    K: HybridKey,
482    V: HybridValue,
483{
484    HybridCacheBuilder::new()
485        .memory(10)
486        .storage()
487        .build()
488        .await
489        .unwrap()
490}
491
492#[derive(Default, Clone)]
493pub struct StateStoreTestReadOptions {
494    pub table_id: TableId,
495    pub prefix_hint: Option<Bytes>,
496    pub prefetch_options: PrefetchOptions,
497    pub cache_policy: CachePolicy,
498    pub read_committed: bool,
499    pub retention_seconds: Option<u32>,
500    pub read_version_from_backup: bool,
501}
502
503impl StateStoreTestReadOptions {
504    fn get_read_epoch(&self, epoch: u64) -> HummockReadEpoch {
505        if self.read_version_from_backup {
506            HummockReadEpoch::Backup(epoch)
507        } else if self.read_committed {
508            HummockReadEpoch::Committed(epoch)
509        } else {
510            HummockReadEpoch::NoWait(epoch)
511        }
512    }
513}
514
515pub type ReadOptions = StateStoreTestReadOptions;
516
517impl From<StateStoreTestReadOptions> for crate::store::ReadOptions {
518    fn from(val: StateStoreTestReadOptions) -> crate::store::ReadOptions {
519        crate::store::ReadOptions {
520            prefix_hint: val.prefix_hint,
521            prefetch_options: val.prefetch_options,
522            cache_policy: val.cache_policy,
523            retention_seconds: val.retention_seconds,
524        }
525    }
526}
527
528pub trait StateStoreReadTestExt: StateStore {
529    /// Point gets a value from the state store.
530    /// The result is based on a snapshot corresponding to the given `epoch`.
531    /// Both full key and the value are returned.
532    fn get_keyed_row(
533        &self,
534        key: TableKey<Bytes>,
535        epoch: u64,
536        read_options: ReadOptions,
537    ) -> impl StorageFuture<'_, Option<StateStoreKeyedRow>>;
538
539    /// Point gets a value from the state store.
540    /// The result is based on a snapshot corresponding to the given `epoch`.
541    /// Only the value is returned.
542    fn get(
543        &self,
544        key: TableKey<Bytes>,
545        epoch: u64,
546        read_options: ReadOptions,
547    ) -> impl StorageFuture<'_, Option<Bytes>> {
548        self.get_keyed_row(key, epoch, read_options)
549            .map_ok(|v| v.map(|(_, v)| v))
550    }
551
552    /// Opens and returns an iterator for given `prefix_hint` and `full_key_range`
553    /// Internally, `prefix_hint` will be used to for checking `bloom_filter` and
554    /// `full_key_range` used for iter. (if the `prefix_hint` not None, it should be be included
555    /// in `key_range`) The returned iterator will iterate data based on a snapshot
556    /// corresponding to the given `epoch`.
557    fn iter(
558        &self,
559        key_range: TableKeyRange,
560        epoch: u64,
561        read_options: ReadOptions,
562    ) -> impl StorageFuture<'_, <<Self as StateStore>::ReadSnapshot as StateStoreRead>::Iter>;
563
564    fn rev_iter(
565        &self,
566        key_range: TableKeyRange,
567        epoch: u64,
568        read_options: ReadOptions,
569    ) -> impl StorageFuture<'_, <<Self as StateStore>::ReadSnapshot as StateStoreRead>::RevIter>;
570
571    fn scan(
572        &self,
573        key_range: TableKeyRange,
574        epoch: u64,
575        limit: Option<usize>,
576        read_options: ReadOptions,
577    ) -> impl StorageFuture<'_, Vec<StateStoreKeyedRow>>;
578}
579
580impl<S: StateStore> StateStoreReadTestExt for S {
581    async fn get_keyed_row(
582        &self,
583        key: TableKey<Bytes>,
584        epoch: u64,
585        read_options: ReadOptions,
586    ) -> StorageResult<Option<StateStoreKeyedRow>> {
587        let snapshot = self
588            .new_read_snapshot(
589                read_options.get_read_epoch(epoch),
590                NewReadSnapshotOptions {
591                    table_id: read_options.table_id,
592                },
593            )
594            .await?;
595        snapshot
596            .on_key_value(key, read_options.into(), |key, value| {
597                Ok((key.copy_into(), Bytes::copy_from_slice(value)))
598            })
599            .await
600    }
601
602    async fn iter(
603        &self,
604        key_range: TableKeyRange,
605        epoch: u64,
606        read_options: ReadOptions,
607    ) -> StorageResult<<<Self as StateStore>::ReadSnapshot as StateStoreRead>::Iter> {
608        let snapshot = self
609            .new_read_snapshot(
610                read_options.get_read_epoch(epoch),
611                NewReadSnapshotOptions {
612                    table_id: read_options.table_id,
613                },
614            )
615            .await?;
616        snapshot.iter(key_range, read_options.into()).await
617    }
618
619    async fn rev_iter(
620        &self,
621        key_range: TableKeyRange,
622        epoch: u64,
623        read_options: ReadOptions,
624    ) -> StorageResult<<<Self as StateStore>::ReadSnapshot as StateStoreRead>::RevIter> {
625        let snapshot = self
626            .new_read_snapshot(
627                read_options.get_read_epoch(epoch),
628                NewReadSnapshotOptions {
629                    table_id: read_options.table_id,
630                },
631            )
632            .await?;
633        snapshot.rev_iter(key_range, read_options.into()).await
634    }
635
636    async fn scan(
637        &self,
638        key_range: TableKeyRange,
639        epoch: u64,
640        limit: Option<usize>,
641        read_options: ReadOptions,
642    ) -> StorageResult<Vec<StateStoreKeyedRow>> {
643        const MAX_INITIAL_CAP: usize = 1024;
644        let limit = limit.unwrap_or(usize::MAX);
645        let mut ret = Vec::with_capacity(min(limit, MAX_INITIAL_CAP));
646        let mut iter = self.iter(key_range, epoch, read_options).await?;
647        while let Some((key, value)) = iter.try_next().await? {
648            ret.push((key.copy_into(), Bytes::copy_from_slice(value)))
649        }
650        Ok(ret)
651    }
652}
653
654pub trait StateStoreGetTestExt: StateStoreGet {
655    fn get(
656        &self,
657        key: TableKey<Bytes>,
658        read_options: ReadOptions,
659    ) -> impl StorageFuture<'_, Option<Bytes>>;
660}
661
662impl<S: StateStoreGet> StateStoreGetTestExt for S {
663    async fn get(
664        &self,
665        key: TableKey<Bytes>,
666        read_options: ReadOptions,
667    ) -> StorageResult<Option<Bytes>> {
668        self.on_key_value(key, read_options.into(), |_, value| {
669            Ok(Bytes::copy_from_slice(value))
670        })
671        .await
672    }
673}