risingwave_storage/hummock/
test_utils.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::min;
16use std::collections::HashMap;
17use std::sync::Arc;
18
19use bytes::Bytes;
20use foyer::{
21    Hint, HybridCache, HybridCacheBuilder, StorageKey as HybridKey, StorageValue as HybridValue,
22};
23use futures::TryFutureExt;
24use itertools::Itertools;
25use risingwave_common::catalog::{TableId, TableOption};
26use risingwave_common::config::EvictionConfig;
27use risingwave_common::hash::VirtualNode;
28use risingwave_common::util::epoch::test_epoch;
29use risingwave_common::util::row_serde::OrderedRowSerde;
30use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange, UserKey};
31use risingwave_hummock_sdk::key_range::KeyRange;
32use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
33use risingwave_hummock_sdk::{
34    EpochWithGap, HummockEpoch, HummockReadEpoch, HummockSstableObjectId,
35};
36
37use super::iterator::test_utils::iterator_test_table_key_of;
38use super::{
39    DEFAULT_RESTART_INTERVAL, HummockResult, InMemWriter, SstableMeta, SstableWriterOptions,
40};
41use crate::StateStore;
42use crate::compaction_catalog_manager::{
43    CompactionCatalogAgent, FilterKeyExtractorImpl, FullKeyFilterKeyExtractor,
44};
45use crate::error::StorageResult;
46use crate::hummock::shared_buffer::shared_buffer_batch::{
47    SharedBufferBatch, SharedBufferItem, SharedBufferValue,
48};
49use crate::hummock::value::HummockValue;
50use crate::hummock::{
51    BlockedXor16FilterBuilder, CachePolicy, DEFAULT_FILTER_HASH_PREALLOC_KEY_COUNT_CAP,
52    FilterBuilder, FilterBuilderOptions, LruCache, Sstable, SstableBuilder, SstableBuilderOptions,
53    SstableStoreRef, SstableWriter, TableHolder, Xor16FilterBuilder,
54};
55use crate::monitor::StoreLocalStatistic;
56use crate::opts::StorageOpts;
57use crate::storage_value::StorageValue;
58use crate::store::*;
59
60pub fn default_opts_for_test() -> StorageOpts {
61    StorageOpts {
62        sstable_size_mb: 4,
63        block_size_kb: 64,
64        bloom_false_positive: 0.1,
65        share_buffers_sync_parallelism: 2,
66        share_buffer_compaction_worker_threads_number: 1,
67        shared_buffer_capacity_mb: 64,
68        data_directory: "hummock_001".to_owned(),
69        write_conflict_detection_enabled: true,
70        block_cache_capacity_mb: 64,
71        meta_cache_capacity_mb: 64,
72        block_cache_eviction_config: EvictionConfig::for_test(),
73        disable_remote_compactor: false,
74        share_buffer_upload_concurrency: 1,
75        compactor_memory_limit_mb: 64,
76        sstable_id_remote_fetch_number: 1,
77        vector_file_block_size_kb: 8,
78        ..Default::default()
79    }
80}
81
82pub fn gen_dummy_batch(n: u64) -> Vec<SharedBufferItem> {
83    vec![(
84        TableKey(Bytes::from(iterator_test_table_key_of(n as usize))),
85        SharedBufferValue::Insert(Bytes::copy_from_slice(&b"value1"[..])),
86    )]
87}
88
89pub fn gen_dummy_batch_several_keys(n: usize) -> Vec<(TableKey<Bytes>, StorageValue)> {
90    let mut kvs = vec![];
91    let v = Bytes::from(b"value1".to_vec().repeat(100));
92    for idx in 0..n {
93        kvs.push((
94            TableKey(Bytes::from(iterator_test_table_key_of(idx))),
95            StorageValue::new_put(v.clone()),
96        ));
97    }
98    kvs
99}
100
101pub fn gen_dummy_sst_info(
102    id: u64,
103    batches: Vec<SharedBufferBatch>,
104    table_id: TableId,
105    epoch: HummockEpoch,
106) -> SstableInfo {
107    let mut min_table_key: Vec<u8> = batches[0].start_table_key().to_vec();
108    let mut max_table_key: Vec<u8> = batches[0].end_table_key().to_vec();
109    let mut file_size = 0;
110    for batch in batches.iter().skip(1) {
111        if min_table_key.as_slice() > *batch.start_table_key() {
112            min_table_key = batch.start_table_key().to_vec();
113        }
114        if max_table_key.as_slice() < *batch.end_table_key() {
115            max_table_key = batch.end_table_key().to_vec();
116        }
117        file_size += batch.size() as u64;
118    }
119    SstableInfoInner {
120        object_id: id.into(),
121        sst_id: id.into(),
122        key_range: KeyRange {
123            left: Bytes::from(FullKey::for_test(table_id, min_table_key, epoch).encode()),
124            right: Bytes::from(FullKey::for_test(table_id, max_table_key, epoch).encode()),
125            right_exclusive: false,
126        },
127        file_size,
128        table_ids: vec![table_id],
129        uncompressed_file_size: file_size,
130        min_epoch: epoch,
131        max_epoch: epoch,
132        sst_size: file_size,
133        ..Default::default()
134    }
135    .into()
136}
137
138/// Number of keys in table generated in `generate_table`.
139pub const TEST_KEYS_COUNT: usize = 10000;
140
141pub fn default_builder_opt_for_test() -> SstableBuilderOptions {
142    SstableBuilderOptions {
143        capacity: 256 * (1 << 20), // 256MB
144        block_capacity: 4096,      // 4KB
145        restart_interval: DEFAULT_RESTART_INTERVAL,
146        bloom_false_positive: 0.1,
147        ..Default::default()
148    }
149}
150
151pub fn default_writer_opt_for_test() -> SstableWriterOptions {
152    SstableWriterOptions {
153        capacity_hint: None,
154        tracker: None,
155        policy: CachePolicy::Disable,
156    }
157}
158
159pub fn mock_sst_writer(opt: &SstableBuilderOptions) -> InMemWriter {
160    InMemWriter::from(opt)
161}
162
163/// Generates sstable data and metadata from given `kv_iter`
164pub async fn gen_test_sstable_data(
165    opts: SstableBuilderOptions,
166    kv_iter: impl Iterator<Item = (FullKey<Vec<u8>>, HummockValue<Vec<u8>>)>,
167) -> (Bytes, SstableMeta) {
168    let table_id_to_vnode = HashMap::from_iter(vec![(
169        TableId::default().as_raw_id(),
170        VirtualNode::COUNT_FOR_TEST,
171    )]);
172    let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
173    let mut b = SstableBuilder::for_test(
174        0,
175        mock_sst_writer(&opts),
176        opts,
177        table_id_to_vnode,
178        table_id_to_watermark_serde,
179    );
180    for (key, value) in kv_iter {
181        b.add_for_test(key.to_ref(), value.as_slice())
182            .await
183            .unwrap();
184    }
185    let output = b.finish().await.unwrap();
186    output.writer_output
187}
188
189/// Write the data and meta to `sstable_store`.
190pub async fn put_sst(
191    sst_object_id: u64,
192    data: Bytes,
193    mut meta: SstableMeta,
194    sstable_store: SstableStoreRef,
195    mut options: SstableWriterOptions,
196    table_ids: Vec<u32>,
197) -> HummockResult<SstableInfo> {
198    options.policy = CachePolicy::NotFill;
199    let mut writer = sstable_store
200        .clone()
201        .create_sst_writer(sst_object_id, options);
202    for block_meta in &meta.block_metas {
203        let offset = block_meta.offset as usize;
204        let end_offset = offset + block_meta.len as usize;
205        writer
206            .write_block(&data[offset..end_offset], block_meta)
207            .await?;
208    }
209
210    // dummy
211    let bloom_filter = {
212        let mut filter_builder = BlockedXor16FilterBuilder::create(FilterBuilderOptions {
213            estimated_key_count: 0,
214            estimated_block_count: meta.block_metas.len(),
215            hash_prealloc_key_count_cap: DEFAULT_FILTER_HASH_PREALLOC_KEY_COUNT_CAP,
216        });
217        for _ in &meta.block_metas {
218            filter_builder.switch_block(None);
219        }
220
221        filter_builder.finish(None)
222    };
223
224    meta.meta_offset = writer.data_len() as u64;
225    meta.bloom_filter = bloom_filter;
226    let sst = SstableInfoInner {
227        object_id: sst_object_id.into(),
228        sst_id: sst_object_id.into(),
229        key_range: KeyRange {
230            left: Bytes::from(meta.smallest_key.clone()),
231            right: Bytes::from(meta.largest_key.clone()),
232            right_exclusive: false,
233        },
234        file_size: meta.estimated_size as u64,
235        meta_offset: meta.meta_offset,
236        uncompressed_file_size: meta.estimated_size as u64,
237        table_ids: table_ids.into_iter().map(Into::into).collect(),
238        ..Default::default()
239    }
240    .into();
241    let writer_output = writer.finish(meta).await?;
242    writer_output.await.unwrap()?;
243    Ok(sst)
244}
245
246/// Generates a test table from the given `kv_iter` and put the kv value to `sstable_store`
247pub async fn gen_test_sstable_impl<B: AsRef<[u8]> + Clone + Default + Eq, F: FilterBuilder>(
248    opts: SstableBuilderOptions,
249    object_id: u64,
250    kv_iter: impl IntoIterator<Item = (FullKey<B>, HummockValue<B>)>,
251    sstable_store: SstableStoreRef,
252    policy: CachePolicy,
253    table_id_to_vnode: HashMap<u32, usize>,
254    table_id_to_watermark_serde: HashMap<u32, Option<(OrderedRowSerde, OrderedRowSerde, usize)>>,
255) -> SstableInfo {
256    let writer_opts = SstableWriterOptions {
257        capacity_hint: None,
258        tracker: None,
259        policy,
260    };
261    let writer = sstable_store
262        .clone()
263        .create_sst_writer(object_id, writer_opts);
264
265    let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
266        FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor),
267        table_id_to_vnode
268            .into_iter()
269            .map(|(table_id, v)| (table_id.into(), v))
270            .collect(),
271        table_id_to_watermark_serde
272            .into_iter()
273            .map(|(table_id, v)| (table_id.into(), v))
274            .collect(),
275        HashMap::default(),
276    ));
277
278    let mut b = SstableBuilder::<_, F>::new(
279        object_id,
280        writer,
281        F::create(opts.filter_builder_options()),
282        opts,
283        compaction_catalog_agent_ref,
284        None,
285    );
286
287    let mut last_key = FullKey::<B>::default();
288    for (key, value) in kv_iter {
289        let is_new_user_key =
290            last_key.is_empty() || key.user_key.as_ref() != last_key.user_key.as_ref();
291        if is_new_user_key {
292            last_key = key.clone();
293        }
294
295        b.add(key.to_ref(), value.as_slice()).await.unwrap();
296    }
297    let output = b.finish().await.unwrap();
298    output.writer_output.await.unwrap().unwrap();
299    output.sst_info.sst_info
300}
301
302/// Generate a test table from the given `kv_iter` and put the kv value to `sstable_store`
303pub async fn gen_test_sstable<B: AsRef<[u8]> + Clone + Default + Eq>(
304    opts: SstableBuilderOptions,
305    object_id: u64,
306    kv_iter: impl Iterator<Item = (FullKey<B>, HummockValue<B>)>,
307    sstable_store: SstableStoreRef,
308) -> (TableHolder, SstableInfo) {
309    let table_id_to_vnode = HashMap::from_iter(vec![(
310        TableId::default().as_raw_id(),
311        VirtualNode::COUNT_FOR_TEST,
312    )]);
313
314    let table_id_to_watermark_serde =
315        HashMap::from_iter(vec![(TableId::default().as_raw_id(), None)]);
316
317    let sst_info = gen_test_sstable_impl::<_, Xor16FilterBuilder>(
318        opts,
319        object_id,
320        kv_iter,
321        sstable_store.clone(),
322        CachePolicy::NotFill,
323        table_id_to_vnode,
324        table_id_to_watermark_serde,
325    )
326    .await;
327
328    (
329        sstable_store
330            .sstable(&sst_info, &mut StoreLocalStatistic::default())
331            .await
332            .unwrap(),
333        sst_info,
334    )
335}
336
337pub async fn gen_test_sstable_with_table_ids<B: AsRef<[u8]> + Clone + Default + Eq>(
338    opts: SstableBuilderOptions,
339    object_id: u64,
340    kv_iter: impl Iterator<Item = (FullKey<B>, HummockValue<B>)>,
341    sstable_store: SstableStoreRef,
342    table_ids: Vec<u32>,
343) -> (TableHolder, SstableInfo) {
344    let table_id_to_vnode = table_ids
345        .iter()
346        .map(|table_id| (*table_id, VirtualNode::COUNT_FOR_TEST))
347        .collect();
348    let table_id_to_watermark_serde = table_ids.iter().map(|table_id| (*table_id, None)).collect();
349
350    let sst_info = gen_test_sstable_impl::<_, Xor16FilterBuilder>(
351        opts,
352        object_id,
353        kv_iter,
354        sstable_store.clone(),
355        CachePolicy::NotFill,
356        table_id_to_vnode,
357        table_id_to_watermark_serde,
358    )
359    .await;
360
361    (
362        sstable_store
363            .sstable(&sst_info, &mut StoreLocalStatistic::default())
364            .await
365            .unwrap(),
366        sst_info,
367    )
368}
369
370/// Generate a test table from the given `kv_iter` and put the kv value to `sstable_store`
371pub async fn gen_test_sstable_info<B: AsRef<[u8]> + Clone + Default + Eq>(
372    opts: SstableBuilderOptions,
373    object_id: u64,
374    kv_iter: impl IntoIterator<Item = (FullKey<B>, HummockValue<B>)>,
375    sstable_store: SstableStoreRef,
376) -> SstableInfo {
377    let table_id_to_vnode = HashMap::from_iter(vec![(
378        TableId::default().as_raw_id(),
379        VirtualNode::COUNT_FOR_TEST,
380    )]);
381
382    let table_id_to_watermark_serde =
383        HashMap::from_iter(vec![(TableId::default().as_raw_id(), None)]);
384
385    gen_test_sstable_impl::<_, BlockedXor16FilterBuilder>(
386        opts,
387        object_id,
388        kv_iter,
389        sstable_store,
390        CachePolicy::NotFill,
391        table_id_to_vnode,
392        table_id_to_watermark_serde,
393    )
394    .await
395}
396
397/// Generate a test table from the given `kv_iter` and put the kv value to `sstable_store`
398pub async fn gen_test_sstable_with_range_tombstone(
399    opts: SstableBuilderOptions,
400    object_id: u64,
401    kv_iter: impl Iterator<Item = (FullKey<Vec<u8>>, HummockValue<Vec<u8>>)>,
402    sstable_store: SstableStoreRef,
403) -> SstableInfo {
404    let table_id_to_vnode = HashMap::from_iter(vec![(
405        TableId::default().as_raw_id(),
406        VirtualNode::COUNT_FOR_TEST,
407    )]);
408
409    let table_id_to_watermark_serde =
410        HashMap::from_iter(vec![(TableId::default().as_raw_id(), None)]);
411
412    gen_test_sstable_impl::<_, Xor16FilterBuilder>(
413        opts,
414        object_id,
415        kv_iter,
416        sstable_store.clone(),
417        CachePolicy::Fill(Hint::Normal),
418        table_id_to_vnode,
419        table_id_to_watermark_serde,
420    )
421    .await
422}
423
424/// Generates a user key with table id 0 and the given `table_key`
425pub fn test_user_key(table_key: impl AsRef<[u8]>) -> UserKey<Vec<u8>> {
426    UserKey::for_test(TableId::default(), table_key.as_ref().to_vec())
427}
428
429/// Generates a user key with table id 0 and table key format of `key_test_{idx * 2}`
430pub fn test_user_key_of(idx: usize) -> UserKey<Vec<u8>> {
431    let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
432    table_key.extend_from_slice(format!("key_test_{:05}", idx * 2).as_bytes());
433    UserKey::for_test(TableId::default(), table_key)
434}
435
436/// Generates a full key with table id 0 and epoch 123. User key is created with `test_user_key_of`.
437pub fn test_key_of(idx: usize) -> FullKey<Vec<u8>> {
438    FullKey {
439        user_key: test_user_key_of(idx),
440        epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
441    }
442}
443
444/// The value of an index in the test table
445pub fn test_value_of(idx: usize) -> Vec<u8> {
446    "23332333"
447        .as_bytes()
448        .iter()
449        .cycle()
450        .cloned()
451        .take(idx % 100 + 1) // so that the table is not too big
452        .collect_vec()
453}
454
455/// Generates a test table used in almost all table-related tests. Developers may verify the
456/// correctness of their implementations by comparing the got value and the expected value
457/// generated by `test_key_of` and `test_value_of`.
458pub async fn gen_default_test_sstable(
459    opts: SstableBuilderOptions,
460    object_id: u64,
461    sstable_store: SstableStoreRef,
462) -> (TableHolder, SstableInfo) {
463    gen_test_sstable(
464        opts,
465        object_id,
466        (0..TEST_KEYS_COUNT).map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
467        sstable_store,
468    )
469    .await
470}
471
472pub async fn count_stream(mut i: impl StateStoreIter) -> usize {
473    let mut c: usize = 0;
474    while i.try_next().await.unwrap().is_some() {
475        c += 1
476    }
477    c
478}
479
480pub fn create_small_table_cache() -> Arc<LruCache<HummockSstableObjectId, Box<Sstable>>> {
481    Arc::new(LruCache::new(1, 4, 0))
482}
483
484pub async fn hybrid_cache_for_test<K, V>() -> HybridCache<K, V>
485where
486    K: HybridKey,
487    V: HybridValue,
488{
489    HybridCacheBuilder::new()
490        .memory(10)
491        .storage()
492        .build()
493        .await
494        .unwrap()
495}
496
497#[derive(Default, Clone)]
498pub struct StateStoreTestReadOptions {
499    pub table_id: TableId,
500    pub prefix_hint: Option<Bytes>,
501    pub prefetch_options: PrefetchOptions,
502    pub cache_policy: CachePolicy,
503    pub read_committed: bool,
504    pub retention_seconds: Option<u32>,
505    pub read_version_from_backup: bool,
506}
507
508impl StateStoreTestReadOptions {
509    fn get_read_epoch(&self, epoch: u64) -> HummockReadEpoch {
510        if self.read_version_from_backup {
511            HummockReadEpoch::Backup(epoch)
512        } else if self.read_committed {
513            HummockReadEpoch::Committed(epoch)
514        } else {
515            HummockReadEpoch::NoWait(epoch)
516        }
517    }
518}
519
520pub type ReadOptions = StateStoreTestReadOptions;
521
522impl From<StateStoreTestReadOptions> for crate::store::ReadOptions {
523    fn from(val: StateStoreTestReadOptions) -> crate::store::ReadOptions {
524        crate::store::ReadOptions {
525            prefix_hint: val.prefix_hint,
526            prefetch_options: val.prefetch_options,
527            cache_policy: val.cache_policy,
528        }
529    }
530}
531
532pub trait StateStoreReadTestExt: StateStore {
533    /// Point gets a value from the state store.
534    /// The result is based on a snapshot corresponding to the given `epoch`.
535    /// Both full key and the value are returned.
536    fn get_keyed_row(
537        &self,
538        key: TableKey<Bytes>,
539        epoch: u64,
540        read_options: ReadOptions,
541    ) -> impl StorageFuture<'_, Option<StateStoreKeyedRow>>;
542
543    /// Point gets a value from the state store.
544    /// The result is based on a snapshot corresponding to the given `epoch`.
545    /// Only the value is returned.
546    fn get(
547        &self,
548        key: TableKey<Bytes>,
549        epoch: u64,
550        read_options: ReadOptions,
551    ) -> impl StorageFuture<'_, Option<Bytes>> {
552        self.get_keyed_row(key, epoch, read_options)
553            .map_ok(|v| v.map(|(_, v)| v))
554    }
555
556    /// Opens and returns an iterator for given `prefix_hint` and `full_key_range`
557    /// Internally, `prefix_hint` will be used for checking the SST filter and
558    /// `full_key_range` used for iter. (if the `prefix_hint` not None, it should be be included
559    /// in `key_range`) The returned iterator will iterate data based on a snapshot
560    /// corresponding to the given `epoch`.
561    fn iter(
562        &self,
563        key_range: TableKeyRange,
564        epoch: u64,
565        read_options: ReadOptions,
566    ) -> impl StorageFuture<'_, <<Self as StateStore>::ReadSnapshot as StateStoreRead>::Iter>;
567
568    fn rev_iter(
569        &self,
570        key_range: TableKeyRange,
571        epoch: u64,
572        read_options: ReadOptions,
573    ) -> impl StorageFuture<'_, <<Self as StateStore>::ReadSnapshot as StateStoreRead>::RevIter>;
574
575    fn scan(
576        &self,
577        key_range: TableKeyRange,
578        epoch: u64,
579        limit: Option<usize>,
580        read_options: ReadOptions,
581    ) -> impl StorageFuture<'_, Vec<StateStoreKeyedRow>>;
582}
583
584impl<S: StateStore> StateStoreReadTestExt for S {
585    async fn get_keyed_row(
586        &self,
587        key: TableKey<Bytes>,
588        epoch: u64,
589        read_options: ReadOptions,
590    ) -> StorageResult<Option<StateStoreKeyedRow>> {
591        let snapshot = self
592            .new_read_snapshot(
593                read_options.get_read_epoch(epoch),
594                NewReadSnapshotOptions {
595                    table_id: read_options.table_id,
596                    table_option: TableOption {
597                        retention_seconds: read_options.retention_seconds,
598                    },
599                },
600            )
601            .await?;
602        snapshot
603            .on_key_value(key, read_options.into(), |key, value| {
604                Ok((key.copy_into(), Bytes::copy_from_slice(value)))
605            })
606            .await
607    }
608
609    async fn iter(
610        &self,
611        key_range: TableKeyRange,
612        epoch: u64,
613        read_options: ReadOptions,
614    ) -> StorageResult<<<Self as StateStore>::ReadSnapshot as StateStoreRead>::Iter> {
615        let snapshot = self
616            .new_read_snapshot(
617                read_options.get_read_epoch(epoch),
618                NewReadSnapshotOptions {
619                    table_id: read_options.table_id,
620                    table_option: TableOption {
621                        retention_seconds: read_options.retention_seconds,
622                    },
623                },
624            )
625            .await?;
626        snapshot.iter(key_range, read_options.into()).await
627    }
628
629    async fn rev_iter(
630        &self,
631        key_range: TableKeyRange,
632        epoch: u64,
633        read_options: ReadOptions,
634    ) -> StorageResult<<<Self as StateStore>::ReadSnapshot as StateStoreRead>::RevIter> {
635        let snapshot = self
636            .new_read_snapshot(
637                read_options.get_read_epoch(epoch),
638                NewReadSnapshotOptions {
639                    table_id: read_options.table_id,
640                    table_option: TableOption {
641                        retention_seconds: read_options.retention_seconds,
642                    },
643                },
644            )
645            .await?;
646        snapshot.rev_iter(key_range, read_options.into()).await
647    }
648
649    async fn scan(
650        &self,
651        key_range: TableKeyRange,
652        epoch: u64,
653        limit: Option<usize>,
654        read_options: ReadOptions,
655    ) -> StorageResult<Vec<StateStoreKeyedRow>> {
656        const MAX_INITIAL_CAP: usize = 1024;
657        let limit = limit.unwrap_or(usize::MAX);
658        let mut ret = Vec::with_capacity(min(limit, MAX_INITIAL_CAP));
659        let mut iter = self.iter(key_range, epoch, read_options).await?;
660        while let Some((key, value)) = iter.try_next().await? {
661            ret.push((key.copy_into(), Bytes::copy_from_slice(value)))
662        }
663        Ok(ret)
664    }
665}
666
667pub trait StateStoreGetTestExt: StateStoreGet {
668    fn get(
669        &self,
670        key: TableKey<Bytes>,
671        read_options: ReadOptions,
672    ) -> impl StorageFuture<'_, Option<Bytes>>;
673}
674
675impl<S: StateStoreGet> StateStoreGetTestExt for S {
676    async fn get(
677        &self,
678        key: TableKey<Bytes>,
679        read_options: ReadOptions,
680    ) -> StorageResult<Option<Bytes>> {
681        self.on_key_value(key, read_options.into(), |_, value| {
682            Ok(Bytes::copy_from_slice(value))
683        })
684        .await
685    }
686}