risingwave_storage/
memory.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
17use std::ops::Bound::{Excluded, Included, Unbounded};
18use std::ops::{Bound, RangeBounds};
19use std::sync::{Arc, LazyLock};
20
21use bytes::Bytes;
22use itertools::Itertools;
23use parking_lot::RwLock;
24use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
25use risingwave_common::catalog::{TableId, TableOption};
26use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
27use risingwave_common::util::epoch::{EpochPair, MAX_EPOCH};
28use risingwave_hummock_sdk::key::{
29    FullKey, TableKey, TableKeyRange, UserKey, prefixed_range_with_vnode,
30};
31use risingwave_hummock_sdk::table_watermark::WatermarkDirection;
32use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch};
33use thiserror_ext::AsReport;
34use tokio::task::yield_now;
35use tracing::error;
36
37use crate::error::StorageResult;
38use crate::hummock::HummockError;
39use crate::hummock::utils::{
40    do_delete_sanity_check, do_insert_sanity_check, do_update_sanity_check, merge_stream,
41    sanity_check_enabled,
42};
43use crate::mem_table::{KeyOp, MemTable};
44use crate::storage_value::StorageValue;
45use crate::store::*;
46
47pub type BytesFullKey = FullKey<Bytes>;
48pub type BytesFullKeyRange = (Bound<BytesFullKey>, Bound<BytesFullKey>);
49
50#[allow(clippy::type_complexity)]
51pub trait RangeKv: Clone + Send + Sync + 'static {
52    fn range(
53        &self,
54        range: BytesFullKeyRange,
55        limit: Option<usize>,
56    ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>>;
57
58    fn rev_range(
59        &self,
60        range: BytesFullKeyRange,
61        limit: Option<usize>,
62    ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>>;
63
64    fn ingest_batch(
65        &self,
66        kv_pairs: impl Iterator<Item = (BytesFullKey, Option<Bytes>)>,
67    ) -> StorageResult<()>;
68
69    fn flush(&self) -> StorageResult<()>;
70}
71
72pub type BTreeMapRangeKv = Arc<RwLock<BTreeMap<BytesFullKey, Option<Bytes>>>>;
73
74impl RangeKv for BTreeMapRangeKv {
75    fn range(
76        &self,
77        range: BytesFullKeyRange,
78        limit: Option<usize>,
79    ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>> {
80        let limit = limit.unwrap_or(usize::MAX);
81        Ok(self
82            .read()
83            .range(range)
84            .take(limit)
85            .map(|(key, value)| (key.clone(), value.clone()))
86            .collect())
87    }
88
89    fn rev_range(
90        &self,
91        range: BytesFullKeyRange,
92        limit: Option<usize>,
93    ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>> {
94        let limit = limit.unwrap_or(usize::MAX);
95        Ok(self
96            .read()
97            .range(range)
98            .rev()
99            .take(limit)
100            .map(|(key, value)| (key.clone(), value.clone()))
101            .collect())
102    }
103
104    fn ingest_batch(
105        &self,
106        kv_pairs: impl Iterator<Item = (BytesFullKey, Option<Bytes>)>,
107    ) -> StorageResult<()> {
108        let mut inner = self.write();
109        for (key, value) in kv_pairs {
110            inner.insert(key, value);
111        }
112        Ok(())
113    }
114
115    fn flush(&self) -> StorageResult<()> {
116        Ok(())
117    }
118}
119
120pub mod sled {
121    use std::fs::create_dir_all;
122    use std::ops::RangeBounds;
123
124    use bytes::Bytes;
125    use risingwave_hummock_sdk::key::FullKey;
126
127    use crate::error::StorageResult;
128    use crate::memory::{BytesFullKey, BytesFullKeyRange, RangeKv, RangeKvStateStore};
129
130    #[derive(Clone)]
131    pub struct SledRangeKv {
132        inner: sled::Db,
133    }
134
135    impl SledRangeKv {
136        pub fn new(path: impl AsRef<std::path::Path>) -> Self {
137            SledRangeKv {
138                inner: sled::open(path).expect("open"),
139            }
140        }
141
142        pub fn new_temp() -> Self {
143            create_dir_all("./.risingwave/sled").expect("should create");
144            let path = tempfile::TempDir::new_in("./.risingwave/sled")
145                .expect("find temp dir")
146                .into_path();
147            Self::new(path)
148        }
149    }
150
151    const EMPTY: u8 = 1;
152    const NON_EMPTY: u8 = 0;
153
154    impl RangeKv for SledRangeKv {
155        fn range(
156            &self,
157            range: BytesFullKeyRange,
158            limit: Option<usize>,
159        ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>> {
160            let (left, right) = range;
161            let full_key_ref_bound = (
162                left.as_ref().map(FullKey::to_ref),
163                right.as_ref().map(FullKey::to_ref),
164            );
165            let left_encoded = left.as_ref().map(|key| key.to_ref().encode_reverse_epoch());
166            let right_encoded = right
167                .as_ref()
168                .map(|key| key.to_ref().encode_reverse_epoch());
169            let limit = limit.unwrap_or(usize::MAX);
170            let mut ret = vec![];
171            for result in self.inner.range((left_encoded, right_encoded)).take(limit) {
172                let (key, value) = result?;
173                let full_key = FullKey::decode_reverse_epoch(key.as_ref()).copy_into();
174                if !full_key_ref_bound.contains(&full_key.to_ref()) {
175                    continue;
176                }
177                let value = match value.as_ref() {
178                    [EMPTY] => None,
179                    [NON_EMPTY, rest @ ..] => Some(Bytes::from(Vec::from(rest))),
180                    _ => unreachable!("malformed value: {:?}", value),
181                };
182                ret.push((full_key, value))
183            }
184            Ok(ret)
185        }
186
187        fn rev_range(
188            &self,
189            range: BytesFullKeyRange,
190            limit: Option<usize>,
191        ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>> {
192            let (left, right) = range;
193            let full_key_ref_bound = (
194                left.as_ref().map(FullKey::to_ref),
195                right.as_ref().map(FullKey::to_ref),
196            );
197            let left_encoded = left.as_ref().map(|key| key.to_ref().encode_reverse_epoch());
198            let right_encoded = right
199                .as_ref()
200                .map(|key| key.to_ref().encode_reverse_epoch());
201            let limit = limit.unwrap_or(usize::MAX);
202            let mut ret = vec![];
203            for result in self
204                .inner
205                .range((left_encoded, right_encoded))
206                .rev()
207                .take(limit)
208            {
209                let (key, value) = result?;
210                let full_key = FullKey::decode_reverse_epoch(key.as_ref()).copy_into();
211                if !full_key_ref_bound.contains(&full_key.to_ref()) {
212                    continue;
213                }
214                let value = match value.as_ref() {
215                    [EMPTY] => None,
216                    [NON_EMPTY, rest @ ..] => Some(Bytes::from(Vec::from(rest))),
217                    _ => unreachable!("malformed value: {:?}", value),
218                };
219                ret.push((full_key, value))
220            }
221            Ok(ret)
222        }
223
224        fn ingest_batch(
225            &self,
226            kv_pairs: impl Iterator<Item = (BytesFullKey, Option<Bytes>)>,
227        ) -> StorageResult<()> {
228            let mut batch = sled::Batch::default();
229            for (key, value) in kv_pairs {
230                let encoded_key = key.encode_reverse_epoch();
231                let key = sled::IVec::from(encoded_key);
232                let mut buffer =
233                    Vec::with_capacity(value.as_ref().map(|v| v.len()).unwrap_or_default() + 1);
234                if let Some(value) = value {
235                    buffer.push(NON_EMPTY);
236                    buffer.extend_from_slice(value.as_ref());
237                } else {
238                    buffer.push(EMPTY);
239                }
240                let value = sled::IVec::from(buffer);
241                batch.insert(key, value);
242            }
243            self.inner.apply_batch(batch)?;
244            Ok(())
245        }
246
247        fn flush(&self) -> StorageResult<()> {
248            Ok(self.inner.flush().map(|_| {})?)
249        }
250    }
251
252    pub type SledStateStore = RangeKvStateStore<SledRangeKv>;
253
254    impl SledStateStore {
255        pub fn new(path: impl AsRef<std::path::Path>) -> Self {
256            RangeKvStateStore {
257                inner: SledRangeKv::new(path),
258                tables: Default::default(),
259            }
260        }
261
262        pub fn new_temp() -> Self {
263            RangeKvStateStore {
264                inner: SledRangeKv::new_temp(),
265                tables: Default::default(),
266            }
267        }
268    }
269
270    #[cfg(test)]
271    mod test {
272        use std::ops::{Bound, RangeBounds};
273
274        use bytes::Bytes;
275        use risingwave_common::catalog::TableId;
276        use risingwave_common::util::epoch::EPOCH_SPILL_TIME_MASK;
277        use risingwave_hummock_sdk::EpochWithGap;
278        use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey};
279
280        use crate::memory::RangeKv;
281        use crate::memory::sled::SledRangeKv;
282
283        #[test]
284        fn test_filter_variable_key_length_false_positive() {
285            let table_id = TableId { table_id: 233 };
286            let epoch = u64::MAX - u64::from_be_bytes([1, 2, 3, 4, 5, 6, 7, 8]);
287            let excluded_short_table_key = [0, 1, 0, 0];
288            let included_long_table_key = [0, 1, 0, 0, 1, 2];
289            let left_table_key = [0, 1, 0, 0, 1];
290            let right_table_key = [0, 1, 1, 1];
291
292            let to_full_key = |table_key: &[u8]| FullKey {
293                user_key: UserKey {
294                    table_id,
295                    table_key: TableKey(Bytes::from(table_key.to_vec())),
296                },
297                epoch_with_gap: EpochWithGap::new_from_epoch(epoch & !EPOCH_SPILL_TIME_MASK),
298            };
299
300            let left_full_key = to_full_key(&left_table_key[..]);
301            let right_full_key = to_full_key(&right_table_key[..]);
302            let included_long_full_key = to_full_key(&included_long_table_key[..]);
303            let excluded_short_full_key = to_full_key(&excluded_short_table_key[..]);
304
305            assert!(
306                (
307                    Bound::Included(left_full_key.to_ref()),
308                    Bound::Included(right_full_key.to_ref())
309                )
310                    .contains(&included_long_full_key.to_ref())
311            );
312            assert!(
313                !(
314                    Bound::Included(left_full_key.to_ref()),
315                    Bound::Included(right_full_key.to_ref())
316                )
317                    .contains(&excluded_short_full_key.to_ref())
318            );
319
320            let left_encoded = left_full_key.encode_reverse_epoch();
321            let right_encoded = right_full_key.encode_reverse_epoch();
322
323            assert!(
324                (
325                    Bound::Included(left_encoded.clone()),
326                    Bound::Included(right_encoded.clone())
327                )
328                    .contains(&included_long_full_key.encode_reverse_epoch())
329            );
330            assert!(
331                (
332                    Bound::Included(left_encoded),
333                    Bound::Included(right_encoded)
334                )
335                    .contains(&excluded_short_full_key.encode_reverse_epoch())
336            );
337
338            let sled_range_kv = SledRangeKv::new_temp();
339            sled_range_kv
340                .ingest_batch(
341                    vec![
342                        (included_long_full_key.clone(), None),
343                        (excluded_short_full_key, None),
344                    ]
345                    .into_iter(),
346                )
347                .unwrap();
348            let kvs = sled_range_kv
349                .range(
350                    (
351                        Bound::Included(left_full_key),
352                        Bound::Included(right_full_key),
353                    ),
354                    None,
355                )
356                .unwrap();
357            assert_eq!(1, kvs.len());
358            assert_eq!(included_long_full_key.to_ref(), kvs[0].0.to_ref());
359            assert!(kvs[0].1.is_none());
360        }
361    }
362}
363
364mod batched_iter {
365
366    use super::*;
367
368    /// A utility struct for iterating over a range of keys in a locked `BTreeMap`, which will batch
369    /// some records to make a trade-off between the copying overhead and the times of acquiring
370    /// the lock.
371    ///
372    /// Therefore, it's not guaranteed that we're iterating over a consistent snapshot of the map.
373    /// Users should handle MVCC by themselves.
374    pub struct Iter<R: RangeKv> {
375        inner: R,
376        range: BytesFullKeyRange,
377        current: std::vec::IntoIter<(FullKey<Bytes>, Option<Bytes>)>,
378        rev: bool,
379    }
380
381    impl<R: RangeKv> Iter<R> {
382        pub fn new(inner: R, range: BytesFullKeyRange, rev: bool) -> Self {
383            Self {
384                inner,
385                range,
386                rev,
387                current: Vec::new().into_iter(),
388            }
389        }
390    }
391
392    impl<R: RangeKv> Iter<R> {
393        const BATCH_SIZE: usize = 256;
394
395        /// Get the next batch of records and fill the `current` buffer.
396        fn refill(&mut self) -> StorageResult<()> {
397            assert!(self.current.is_empty());
398
399            let batch = if self.rev {
400                self.inner.rev_range(
401                    (self.range.0.clone(), self.range.1.clone()),
402                    Some(Self::BATCH_SIZE),
403                )?
404            } else {
405                self.inner.range(
406                    (self.range.0.clone(), self.range.1.clone()),
407                    Some(Self::BATCH_SIZE),
408                )?
409            };
410
411            if let Some((last_key, _)) = batch.last() {
412                let full_key = FullKey::new_with_gap_epoch(
413                    last_key.user_key.table_id,
414                    TableKey(last_key.user_key.table_key.0.clone()),
415                    last_key.epoch_with_gap,
416                );
417                if self.rev {
418                    self.range.1 = Bound::Excluded(full_key);
419                } else {
420                    self.range.0 = Bound::Excluded(full_key);
421                }
422            }
423            self.current = batch.into_iter();
424            Ok(())
425        }
426    }
427
428    impl<R: RangeKv> Iter<R> {
429        #[allow(clippy::type_complexity)]
430        pub fn next(&mut self) -> StorageResult<Option<(BytesFullKey, Option<Bytes>)>> {
431            match self.current.next() {
432                Some((key, value)) => Ok(Some((key, value))),
433                None => {
434                    self.refill()?;
435                    Ok(self.current.next())
436                }
437            }
438        }
439    }
440
441    #[cfg(test)]
442    mod tests {
443        use rand::Rng;
444
445        use super::*;
446        use crate::memory::sled::SledRangeKv;
447
448        #[test]
449        fn test_btreemap_iter_chaos() {
450            let map = Arc::new(RwLock::new(BTreeMap::new()));
451            test_iter_chaos_inner(map, 1000);
452        }
453
454        #[cfg(not(madsim))]
455        #[test]
456        fn test_sled_iter_chaos() {
457            let map = SledRangeKv::new_temp();
458            test_iter_chaos_inner(map, 100);
459        }
460
461        fn test_iter_chaos_inner(map: impl RangeKv, count: usize) {
462            let key_range = 1..=10000;
463            let num_to_bytes = |k: i32| Bytes::from(format!("{:06}", k).as_bytes().to_vec());
464            let num_to_full_key =
465                |k: i32| FullKey::new(TableId::default(), TableKey(num_to_bytes(k)), 0);
466            #[allow(clippy::mutable_key_type)]
467            map.ingest_batch(key_range.clone().map(|k| {
468                let key = num_to_full_key(k);
469                let b = key.user_key.table_key.0.clone();
470
471                (key, Some(b))
472            }))
473            .unwrap();
474
475            let rand_bound = || {
476                let key = rand::rng().random_range(key_range.clone());
477                let key = num_to_full_key(key);
478                match rand::rng().random_range(1..=5) {
479                    1 | 2 => Bound::Included(key),
480                    3 | 4 => Bound::Excluded(key),
481                    _ => Bound::Unbounded,
482                }
483            };
484
485            for _ in 0..count {
486                let range = loop {
487                    let range = (rand_bound(), rand_bound());
488                    let (start, end) = (range.start_bound(), range.end_bound());
489
490                    // Filter out invalid ranges. Code migrated from `BTreeMap::range`.
491                    match (start, end) {
492                        (Bound::Excluded(s), Bound::Excluded(e)) if s == e => {
493                            continue;
494                        }
495                        (
496                            Bound::Included(s) | Bound::Excluded(s),
497                            Bound::Included(e) | Bound::Excluded(e),
498                        ) if s > e => {
499                            continue;
500                        }
501                        _ => break range,
502                    }
503                };
504
505                let v1 = {
506                    let mut v = vec![];
507                    let mut iter = Iter::new(map.clone(), range.clone(), false);
508                    while let Some((key, value)) = iter.next().unwrap() {
509                        v.push((key, value));
510                    }
511                    v
512                };
513                let v2 = map.range(range, None).unwrap();
514
515                // Items iterated from the batched iterator should be the same as normaliterator.
516                assert_eq!(v1, v2);
517            }
518        }
519    }
520}
521
522pub type MemoryStateStore = RangeKvStateStore<BTreeMapRangeKv>;
523
524struct TableState {
525    init_epoch: u64,
526    next_epochs: BTreeMap<u64, u64>,
527    latest_sealed_epoch: Option<u64>,
528    sealing_epochs: BTreeMap<u64, BitmapBuilder>,
529}
530
531impl TableState {
532    fn new(init_epoch: u64) -> Self {
533        Self {
534            init_epoch,
535            next_epochs: Default::default(),
536            latest_sealed_epoch: None,
537            sealing_epochs: Default::default(),
538        }
539    }
540
541    async fn wait_epoch(
542        tables: &parking_lot::Mutex<HashMap<TableId, Self>>,
543        table_id: TableId,
544        epoch: u64,
545    ) {
546        loop {
547            {
548                let tables = tables.lock();
549                let table_state = tables.get(&table_id).expect("should exist");
550                assert!(epoch >= table_state.init_epoch);
551                if epoch == table_state.init_epoch {
552                    return;
553                }
554                if let Some(latest_sealed_epoch) = table_state.latest_sealed_epoch
555                    && latest_sealed_epoch >= epoch
556                {
557                    return;
558                }
559            }
560            yield_now().await;
561        }
562    }
563}
564
565/// An in-memory state store
566///
567/// The in-memory state store is a [`BTreeMap`], which maps [`FullKey`] to value. It
568/// never does GC, so the memory usage will be high. Therefore, in-memory state store should never
569/// be used in production.
570#[derive(Clone, Default)]
571pub struct RangeKvStateStore<R: RangeKv> {
572    /// Stores (key, epoch) -> user value.
573    inner: R,
574    /// `table_id` -> `prev_epoch` -> `curr_epoch`
575    tables: Arc<parking_lot::Mutex<HashMap<TableId, TableState>>>,
576}
577
578fn to_full_key_range<R, B>(table_id: TableId, table_key_range: R) -> BytesFullKeyRange
579where
580    R: RangeBounds<B> + Send,
581    B: AsRef<[u8]>,
582{
583    let start = match table_key_range.start_bound() {
584        Included(k) => Included(FullKey::new(
585            table_id,
586            TableKey(Bytes::from(k.as_ref().to_vec())),
587            HummockEpoch::MAX,
588        )),
589        Excluded(k) => Excluded(FullKey::new(
590            table_id,
591            TableKey(Bytes::from(k.as_ref().to_vec())),
592            0,
593        )),
594        Unbounded => Included(FullKey::new(
595            table_id,
596            TableKey(Bytes::from(b"".to_vec())),
597            HummockEpoch::MAX,
598        )),
599    };
600    let end = match table_key_range.end_bound() {
601        Included(k) => Included(FullKey::new(
602            table_id,
603            TableKey(Bytes::from(k.as_ref().to_vec())),
604            0,
605        )),
606        Excluded(k) => Excluded(FullKey::new(
607            table_id,
608            TableKey(Bytes::from(k.as_ref().to_vec())),
609            HummockEpoch::MAX,
610        )),
611        Unbounded => {
612            if let Some(next_table_id) = table_id.table_id().checked_add(1) {
613                Excluded(FullKey::new(
614                    next_table_id.into(),
615                    TableKey(Bytes::from(b"".to_vec())),
616                    HummockEpoch::MAX,
617                ))
618            } else {
619                Unbounded
620            }
621        }
622    };
623    (start, end)
624}
625
626impl MemoryStateStore {
627    pub fn new() -> Self {
628        Self::default()
629    }
630
631    pub fn shared() -> Self {
632        static STORE: LazyLock<MemoryStateStore> = LazyLock::new(MemoryStateStore::new);
633        STORE.clone()
634    }
635}
636
637impl<R: RangeKv> RangeKvStateStore<R> {
638    fn scan(
639        &self,
640        key_range: TableKeyRange,
641        epoch: u64,
642        table_id: TableId,
643        limit: Option<usize>,
644    ) -> StorageResult<Vec<(Bytes, Bytes)>> {
645        let mut data = vec![];
646        if limit == Some(0) {
647            return Ok(vec![]);
648        }
649        let mut last_user_key = None;
650        for (key, value) in self
651            .inner
652            .range(to_full_key_range(table_id, key_range), None)?
653        {
654            if key.epoch_with_gap.pure_epoch() > epoch {
655                continue;
656            }
657            if Some(&key.user_key) != last_user_key.as_ref() {
658                if let Some(value) = value {
659                    data.push((Bytes::from(key.encode()), value.clone()));
660                }
661                last_user_key = Some(key.user_key.clone());
662            }
663            if let Some(limit) = limit
664                && data.len() >= limit
665            {
666                break;
667            }
668        }
669        Ok(data)
670    }
671}
672
673#[derive(Clone)]
674pub struct RangeKvStateStoreReadSnapshot<R: RangeKv> {
675    inner: RangeKvStateStore<R>,
676    epoch: u64,
677    table_id: TableId,
678}
679
680impl<R: RangeKv> StateStoreRead for RangeKvStateStoreReadSnapshot<R> {
681    type Iter = RangeKvStateStoreIter<R>;
682    type RevIter = RangeKvStateStoreRevIter<R>;
683
684    async fn get_keyed_row(
685        &self,
686        key: TableKey<Bytes>,
687        _read_options: ReadOptions,
688    ) -> StorageResult<Option<StateStoreKeyedRow>> {
689        self.inner
690            .get_keyed_row_impl(key, self.epoch, self.table_id)
691    }
692
693    async fn iter(
694        &self,
695        key_range: TableKeyRange,
696        _read_options: ReadOptions,
697    ) -> StorageResult<Self::Iter> {
698        self.inner.iter_impl(key_range, self.epoch, self.table_id)
699    }
700
701    async fn rev_iter(
702        &self,
703        key_range: TableKeyRange,
704        _read_options: ReadOptions,
705    ) -> StorageResult<Self::RevIter> {
706        self.inner
707            .rev_iter_impl(key_range, self.epoch, self.table_id)
708    }
709}
710
711impl<R: RangeKv> RangeKvStateStore<R> {
712    fn get_keyed_row_impl(
713        &self,
714        key: TableKey<Bytes>,
715        epoch: u64,
716        table_id: TableId,
717    ) -> StorageResult<Option<StateStoreKeyedRow>> {
718        let range_bounds = (Bound::Included(key.clone()), Bound::Included(key));
719        // We do not really care about vnodes here, so we just use the default value.
720        let res = self.scan(range_bounds, epoch, table_id, Some(1))?;
721
722        Ok(match res.as_slice() {
723            [] => None,
724            [(key, value)] => Some((
725                FullKey::decode(key.as_ref()).to_vec().into_bytes(),
726                value.clone(),
727            )),
728            _ => unreachable!(),
729        })
730    }
731
732    fn iter_impl(
733        &self,
734        key_range: TableKeyRange,
735        epoch: u64,
736        table_id: TableId,
737    ) -> StorageResult<RangeKvStateStoreIter<R>> {
738        Ok(RangeKvStateStoreIter::new(
739            batched_iter::Iter::new(
740                self.inner.clone(),
741                to_full_key_range(table_id, key_range),
742                false,
743            ),
744            epoch,
745            true,
746        ))
747    }
748
749    fn rev_iter_impl(
750        &self,
751        key_range: TableKeyRange,
752        epoch: u64,
753        table_id: TableId,
754    ) -> StorageResult<RangeKvStateStoreRevIter<R>> {
755        Ok(RangeKvStateStoreRevIter::new(
756            batched_iter::Iter::new(
757                self.inner.clone(),
758                to_full_key_range(table_id, key_range),
759                true,
760            ),
761            epoch,
762            true,
763        ))
764    }
765}
766
767impl<R: RangeKv> StateStoreReadLog for RangeKvStateStore<R> {
768    type ChangeLogIter = RangeKvStateStoreChangeLogIter<R>;
769
770    async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
771        loop {
772            {
773                let tables = self.tables.lock();
774                let Some(tables) = tables.get(&options.table_id) else {
775                    return Err(HummockError::next_epoch(format!(
776                        "table {} not exist",
777                        options.table_id
778                    ))
779                    .into());
780                };
781                if let Some(next_epoch) = tables.next_epochs.get(&epoch) {
782                    break Ok(*next_epoch);
783                }
784            }
785            yield_now().await;
786        }
787    }
788
789    async fn iter_log(
790        &self,
791        (min_epoch, max_epoch): (u64, u64),
792        key_range: TableKeyRange,
793        options: ReadLogOptions,
794    ) -> StorageResult<Self::ChangeLogIter> {
795        let new_value_iter = RangeKvStateStoreIter::new(
796            batched_iter::Iter::new(
797                self.inner.clone(),
798                to_full_key_range(options.table_id, key_range.clone()),
799                false,
800            ),
801            max_epoch,
802            true,
803        );
804        let old_value_iter = RangeKvStateStoreIter::new(
805            batched_iter::Iter::new(
806                self.inner.clone(),
807                to_full_key_range(options.table_id, key_range),
808                false,
809            ),
810            min_epoch,
811            false,
812        );
813        RangeKvStateStoreChangeLogIter::new(new_value_iter, old_value_iter)
814    }
815}
816
817impl<R: RangeKv> RangeKvStateStore<R> {
818    fn new_read_snapshot_impl(
819        &self,
820        epoch: u64,
821        table_id: TableId,
822    ) -> RangeKvStateStoreReadSnapshot<R> {
823        RangeKvStateStoreReadSnapshot {
824            inner: self.clone(),
825            epoch,
826            table_id,
827        }
828    }
829
830    pub(crate) fn ingest_batch(
831        &self,
832        mut kv_pairs: Vec<(TableKey<Bytes>, StorageValue)>,
833        delete_ranges: Vec<(Bound<Bytes>, Bound<Bytes>)>,
834        write_options: WriteOptions,
835    ) -> StorageResult<usize> {
836        let epoch = write_options.epoch;
837
838        let mut delete_keys = BTreeSet::new();
839        for del_range in delete_ranges {
840            for (key, _) in self.inner.range(
841                (
842                    del_range.0.map(|table_key| {
843                        FullKey::new(write_options.table_id, TableKey(table_key), epoch)
844                    }),
845                    del_range.1.map(|table_key| {
846                        FullKey::new(write_options.table_id, TableKey(table_key), epoch)
847                    }),
848                ),
849                None,
850            )? {
851                delete_keys.insert(key.user_key.table_key);
852            }
853        }
854        for key in delete_keys {
855            kv_pairs.push((key, StorageValue::new_delete()));
856        }
857
858        let mut size = 0;
859        self.inner
860            .ingest_batch(kv_pairs.into_iter().map(|(key, value)| {
861                size += key.len() + value.size();
862                (
863                    FullKey::new(write_options.table_id, key, epoch),
864                    value.user_value,
865                )
866            }))?;
867        Ok(size)
868    }
869}
870
871impl<R: RangeKv> StateStore for RangeKvStateStore<R> {
872    type Local = RangeKvLocalStateStore<R>;
873    type ReadSnapshot = RangeKvStateStoreReadSnapshot<R>;
874
875    async fn try_wait_epoch(
876        &self,
877        _epoch: HummockReadEpoch,
878        _options: TryWaitEpochOptions,
879    ) -> StorageResult<()> {
880        // memory backend doesn't need to wait for epoch, so this is a no-op.
881        Ok(())
882    }
883
884    async fn new_local(&self, option: NewLocalOptions) -> Self::Local {
885        RangeKvLocalStateStore::new(self.clone(), option)
886    }
887
888    async fn new_read_snapshot(
889        &self,
890        epoch: HummockReadEpoch,
891        options: NewReadSnapshotOptions,
892    ) -> StorageResult<Self::ReadSnapshot> {
893        Ok(self.new_read_snapshot_impl(epoch.get_epoch(), options.table_id))
894    }
895}
896
897pub struct RangeKvLocalStateStore<R: RangeKv> {
898    mem_table: MemTable,
899    inner: RangeKvStateStore<R>,
900
901    epoch: Option<EpochPair>,
902
903    table_id: TableId,
904    op_consistency_level: OpConsistencyLevel,
905    table_option: TableOption,
906    vnodes: Arc<Bitmap>,
907}
908
909impl<R: RangeKv> RangeKvLocalStateStore<R> {
910    pub fn new(inner: RangeKvStateStore<R>, option: NewLocalOptions) -> Self {
911        Self {
912            inner,
913            mem_table: MemTable::new(option.op_consistency_level.clone()),
914            epoch: None,
915            table_id: option.table_id,
916            op_consistency_level: option.op_consistency_level,
917            table_option: option.table_option,
918            vnodes: option.vnodes,
919        }
920    }
921}
922
923impl<R: RangeKv> LocalStateStore for RangeKvLocalStateStore<R> {
924    type FlushedSnapshotReader = RangeKvStateStoreReadSnapshot<R>;
925
926    type Iter<'a> = impl StateStoreIter + 'a;
927    type RevIter<'a> = impl StateStoreIter + 'a;
928
929    async fn get(
930        &self,
931        key: TableKey<Bytes>,
932        _read_options: ReadOptions,
933    ) -> StorageResult<Option<Bytes>> {
934        match self.mem_table.buffer.get(&key) {
935            None => self
936                .inner
937                .get_keyed_row_impl(key, self.epoch(), self.table_id)
938                .map(|option| option.map(|(_, value)| value)),
939            Some(op) => match op {
940                KeyOp::Insert(value) | KeyOp::Update((_, value)) => Ok(Some(value.clone())),
941                KeyOp::Delete(_) => Ok(None),
942            },
943        }
944    }
945
946    async fn iter(
947        &self,
948        key_range: TableKeyRange,
949        _read_options: ReadOptions,
950    ) -> StorageResult<Self::Iter<'_>> {
951        let iter = self
952            .inner
953            .iter_impl(key_range.clone(), self.epoch(), self.table_id)?;
954        Ok(FromStreamStateStoreIter::new(Box::pin(merge_stream(
955            self.mem_table.iter(key_range),
956            iter.into_stream(to_owned_item),
957            self.table_id,
958            self.epoch(),
959            false,
960        ))))
961    }
962
963    async fn rev_iter(
964        &self,
965        key_range: TableKeyRange,
966        _read_options: ReadOptions,
967    ) -> StorageResult<Self::RevIter<'_>> {
968        let iter = self
969            .inner
970            .rev_iter_impl(key_range.clone(), self.epoch(), self.table_id)?;
971        Ok(FromStreamStateStoreIter::new(Box::pin(merge_stream(
972            self.mem_table.rev_iter(key_range),
973            iter.into_stream(to_owned_item),
974            self.table_id,
975            self.epoch(),
976            true,
977        ))))
978    }
979
980    fn insert(
981        &mut self,
982        key: TableKey<Bytes>,
983        new_val: Bytes,
984        old_val: Option<Bytes>,
985    ) -> StorageResult<()> {
986        match old_val {
987            None => self.mem_table.insert(key, new_val)?,
988            Some(old_val) => self.mem_table.update(key, old_val, new_val)?,
989        };
990        Ok(())
991    }
992
993    fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
994        Ok(self.mem_table.delete(key, old_val)?)
995    }
996
997    async fn flush(&mut self) -> StorageResult<usize> {
998        let buffer = self.mem_table.drain().into_parts();
999        let mut kv_pairs = Vec::with_capacity(buffer.len());
1000        let sanity_check_read_snapshot = if sanity_check_enabled() {
1001            Some(self.inner.new_read_snapshot_impl(MAX_EPOCH, self.table_id))
1002        } else {
1003            None
1004        };
1005        for (key, key_op) in buffer {
1006            match key_op {
1007                // Currently, some executors do not strictly comply with these semantics. As
1008                // a workaround you may call disable the check by initializing the
1009                // state store with `op_consistency_level=Inconsistent`.
1010                KeyOp::Insert(value) => {
1011                    if let Some(sanity_check_read_snapshot) = &sanity_check_read_snapshot {
1012                        do_insert_sanity_check(
1013                            &key,
1014                            &value,
1015                            sanity_check_read_snapshot,
1016                            self.table_id,
1017                            self.table_option,
1018                            &self.op_consistency_level,
1019                        )
1020                        .await?;
1021                    }
1022                    kv_pairs.push((key, StorageValue::new_put(value)));
1023                }
1024                KeyOp::Delete(old_value) => {
1025                    if let Some(sanity_check_read_snapshot) = &sanity_check_read_snapshot {
1026                        do_delete_sanity_check(
1027                            &key,
1028                            &old_value,
1029                            sanity_check_read_snapshot,
1030                            self.table_id,
1031                            self.table_option,
1032                            &self.op_consistency_level,
1033                        )
1034                        .await?;
1035                    }
1036                    kv_pairs.push((key, StorageValue::new_delete()));
1037                }
1038                KeyOp::Update((old_value, new_value)) => {
1039                    if let Some(sanity_check_read_snapshot) = &sanity_check_read_snapshot {
1040                        do_update_sanity_check(
1041                            &key,
1042                            &old_value,
1043                            &new_value,
1044                            sanity_check_read_snapshot,
1045                            self.table_id,
1046                            self.table_option,
1047                            &self.op_consistency_level,
1048                        )
1049                        .await?;
1050                    }
1051                    kv_pairs.push((key, StorageValue::new_put(new_value)));
1052                }
1053            }
1054        }
1055        self.inner.ingest_batch(
1056            kv_pairs,
1057            vec![],
1058            WriteOptions {
1059                epoch: self.epoch(),
1060                table_id: self.table_id,
1061            },
1062        )
1063    }
1064
1065    fn epoch(&self) -> u64 {
1066        self.epoch.expect("should have set the epoch").curr
1067    }
1068
1069    fn is_dirty(&self) -> bool {
1070        self.mem_table.is_dirty()
1071    }
1072
1073    async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
1074        assert_eq!(
1075            self.epoch.replace(options.epoch),
1076            None,
1077            "epoch in local state store of table id {:?} is init for more than once",
1078            self.table_id
1079        );
1080        self.inner
1081            .tables
1082            .lock()
1083            .entry(self.table_id)
1084            .or_insert_with(|| TableState::new(options.epoch.prev))
1085            .next_epochs
1086            .insert(options.epoch.prev, options.epoch.curr);
1087        if self.vnodes.len() > 1 {
1088            TableState::wait_epoch(&self.inner.tables, self.table_id, options.epoch.prev).await;
1089        }
1090
1091        Ok(())
1092    }
1093
1094    fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
1095        assert!(!self.is_dirty());
1096        if let Some(value_checker) = opts.switch_op_consistency_level {
1097            self.mem_table.op_consistency_level.update(&value_checker);
1098        }
1099        let epoch = self
1100            .epoch
1101            .as_mut()
1102            .expect("should have init epoch before seal the first epoch");
1103        let prev_epoch = epoch.curr;
1104        epoch.prev = prev_epoch;
1105        epoch.curr = next_epoch;
1106        assert!(
1107            next_epoch > prev_epoch,
1108            "new epoch {} should be greater than current epoch: {}",
1109            next_epoch,
1110            prev_epoch
1111        );
1112
1113        let mut tables = self.inner.tables.lock();
1114        let table_state = tables
1115            .get_mut(&self.table_id)
1116            .expect("should be set when init");
1117
1118        table_state.next_epochs.insert(prev_epoch, next_epoch);
1119        if self.vnodes.len() > 1 {
1120            let sealing_epoch_vnodes = table_state
1121                .sealing_epochs
1122                .entry(prev_epoch)
1123                .or_insert_with(|| BitmapBuilder::zeroed(self.vnodes.len()));
1124            assert_eq!(self.vnodes.len(), sealing_epoch_vnodes.len());
1125            for vnode in self.vnodes.iter_ones() {
1126                assert!(!sealing_epoch_vnodes.is_set(vnode));
1127                sealing_epoch_vnodes.set(vnode, true);
1128            }
1129            if (0..self.vnodes.len()).all(|vnode| sealing_epoch_vnodes.is_set(vnode)) {
1130                let (all_sealed_epoch, _) =
1131                    table_state.sealing_epochs.pop_first().expect("non-empty");
1132                assert_eq!(
1133                    all_sealed_epoch, prev_epoch,
1134                    "new all_sealed_epoch must be the current prev epoch"
1135                );
1136                if let Some(prev_latest_sealed_epoch) =
1137                    table_state.latest_sealed_epoch.replace(prev_epoch)
1138                {
1139                    assert!(prev_epoch > prev_latest_sealed_epoch);
1140                }
1141            }
1142        }
1143
1144        if let Some((direction, watermarks, _watermark_type)) = opts.table_watermarks {
1145            let delete_ranges = watermarks
1146                .iter()
1147                .flat_map(|vnode_watermark| {
1148                    let inner_range = match direction {
1149                        WatermarkDirection::Ascending => {
1150                            (Unbounded, Excluded(vnode_watermark.watermark().clone()))
1151                        }
1152                        WatermarkDirection::Descending => {
1153                            (Excluded(vnode_watermark.watermark().clone()), Unbounded)
1154                        }
1155                    };
1156                    vnode_watermark
1157                        .vnode_bitmap()
1158                        .iter_vnodes()
1159                        .map(move |vnode| {
1160                            let (start, end) =
1161                                prefixed_range_with_vnode(inner_range.clone(), vnode);
1162                            (start.map(|key| key.0.clone()), end.map(|key| key.0.clone()))
1163                        })
1164                })
1165                .collect_vec();
1166            if let Err(e) = self.inner.ingest_batch(
1167                Vec::new(),
1168                delete_ranges,
1169                WriteOptions {
1170                    epoch: self.epoch(),
1171                    table_id: self.table_id,
1172                },
1173            ) {
1174                error!(error = %e.as_report(), "failed to write delete ranges of table watermark");
1175            }
1176        }
1177    }
1178
1179    async fn try_flush(&mut self) -> StorageResult<()> {
1180        Ok(())
1181    }
1182
1183    async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
1184        if self.vnodes.len() > 1 {
1185            TableState::wait_epoch(
1186                &self.inner.tables,
1187                self.table_id,
1188                self.epoch.expect("should have init").prev,
1189            )
1190            .await;
1191        }
1192        Ok(std::mem::replace(&mut self.vnodes, vnodes))
1193    }
1194
1195    fn get_table_watermark(&self, _vnode: VirtualNode) -> Option<Bytes> {
1196        // TODO: may store the written table watermark and have a correct implementation
1197        None
1198    }
1199
1200    fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
1201        self.inner.new_read_snapshot_impl(MAX_EPOCH, self.table_id)
1202    }
1203}
1204
1205pub struct RangeKvStateStoreIter<R: RangeKv> {
1206    inner: batched_iter::Iter<R>,
1207
1208    epoch: HummockEpoch,
1209    is_inclusive_epoch: bool,
1210
1211    last_key: Option<UserKey<Bytes>>,
1212
1213    item_buffer: Option<StateStoreKeyedRow>,
1214}
1215
1216impl<R: RangeKv> RangeKvStateStoreIter<R> {
1217    pub fn new(
1218        inner: batched_iter::Iter<R>,
1219        epoch: HummockEpoch,
1220        is_inclusive_epoch: bool,
1221    ) -> Self {
1222        Self {
1223            inner,
1224            epoch,
1225            is_inclusive_epoch,
1226            last_key: None,
1227            item_buffer: None,
1228        }
1229    }
1230}
1231
1232impl<R: RangeKv> StateStoreIter for RangeKvStateStoreIter<R> {
1233    async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
1234        self.next_inner()?;
1235        Ok(self
1236            .item_buffer
1237            .as_ref()
1238            .map(|(key, value)| (key.to_ref(), value.as_ref())))
1239    }
1240}
1241
1242impl<R: RangeKv> RangeKvStateStoreIter<R> {
1243    fn next_inner(&mut self) -> StorageResult<()> {
1244        self.item_buffer = None;
1245        while let Some((key, value)) = self.inner.next()? {
1246            let epoch = key.epoch_with_gap.pure_epoch();
1247            if epoch > self.epoch {
1248                continue;
1249            }
1250            if epoch == self.epoch && !self.is_inclusive_epoch {
1251                continue;
1252            }
1253            if Some(key.user_key.as_ref()) != self.last_key.as_ref().map(|key| key.as_ref()) {
1254                self.last_key = Some(key.user_key.clone());
1255                if let Some(value) = value {
1256                    self.item_buffer = Some((key, value));
1257                    break;
1258                }
1259            }
1260        }
1261        Ok(())
1262    }
1263}
1264
1265pub struct RangeKvStateStoreRevIter<R: RangeKv> {
1266    inner: batched_iter::Iter<R>,
1267
1268    epoch: HummockEpoch,
1269    is_inclusive_epoch: bool,
1270
1271    item_buffer: VecDeque<StateStoreKeyedRow>,
1272}
1273
1274impl<R: RangeKv> RangeKvStateStoreRevIter<R> {
1275    pub fn new(
1276        inner: batched_iter::Iter<R>,
1277        epoch: HummockEpoch,
1278        is_inclusive_epoch: bool,
1279    ) -> Self {
1280        Self {
1281            inner,
1282            epoch,
1283            is_inclusive_epoch,
1284            item_buffer: VecDeque::default(),
1285        }
1286    }
1287}
1288
1289impl<R: RangeKv> StateStoreIter for RangeKvStateStoreRevIter<R> {
1290    async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
1291        self.next_inner()?;
1292        Ok(self
1293            .item_buffer
1294            .back()
1295            .map(|(key, value)| (key.to_ref(), value.as_ref())))
1296    }
1297}
1298
1299impl<R: RangeKv> RangeKvStateStoreRevIter<R> {
1300    fn next_inner(&mut self) -> StorageResult<()> {
1301        self.item_buffer.pop_back();
1302        while let Some((key, value)) = self.inner.next()? {
1303            let epoch = key.epoch_with_gap.pure_epoch();
1304            if epoch > self.epoch {
1305                continue;
1306            }
1307            if epoch == self.epoch && !self.is_inclusive_epoch {
1308                continue;
1309            }
1310
1311            let v = match value {
1312                Some(v) => v,
1313                None => {
1314                    if let Some(last_key) = self.item_buffer.front()
1315                        && key.user_key.as_ref() == last_key.0.user_key.as_ref()
1316                    {
1317                        self.item_buffer.clear();
1318                    }
1319                    continue;
1320                }
1321            };
1322
1323            if let Some(last_key) = self.item_buffer.front() {
1324                if key.user_key.as_ref() != last_key.0.user_key.as_ref() {
1325                    self.item_buffer.push_front((key, v));
1326                    break;
1327                } else {
1328                    self.item_buffer.pop_front();
1329                    self.item_buffer.push_front((key, v));
1330                }
1331            } else {
1332                self.item_buffer.push_front((key, v));
1333            }
1334        }
1335        Ok(())
1336    }
1337}
1338
1339pub struct RangeKvStateStoreChangeLogIter<R: RangeKv> {
1340    new_value_iter: RangeKvStateStoreIter<R>,
1341    old_value_iter: RangeKvStateStoreIter<R>,
1342    item_buffer: Option<(TableKey<Bytes>, ChangeLogValue<Bytes>)>,
1343}
1344
1345impl<R: RangeKv> RangeKvStateStoreChangeLogIter<R> {
1346    fn new(
1347        mut new_value_iter: RangeKvStateStoreIter<R>,
1348        mut old_value_iter: RangeKvStateStoreIter<R>,
1349    ) -> StorageResult<Self> {
1350        new_value_iter.next_inner()?;
1351        old_value_iter.next_inner()?;
1352        Ok(Self {
1353            new_value_iter,
1354            old_value_iter,
1355            item_buffer: None,
1356        })
1357    }
1358}
1359
1360impl<R: RangeKv> StateStoreIter<StateStoreReadLogItem> for RangeKvStateStoreChangeLogIter<R> {
1361    async fn try_next(&mut self) -> StorageResult<Option<StateStoreReadLogItemRef<'_>>> {
1362        loop {
1363            match (
1364                &self.new_value_iter.item_buffer,
1365                &self.old_value_iter.item_buffer,
1366            ) {
1367                (None, None) => {
1368                    self.item_buffer = None;
1369                    break;
1370                }
1371                (Some((key, new_value)), None) => {
1372                    self.item_buffer = Some((
1373                        key.user_key.table_key.clone(),
1374                        ChangeLogValue::Insert(new_value.clone()),
1375                    ));
1376                    self.new_value_iter.next_inner()?;
1377                }
1378                (None, Some((key, old_value))) => {
1379                    self.item_buffer = Some((
1380                        key.user_key.table_key.clone(),
1381                        ChangeLogValue::Delete(old_value.clone()),
1382                    ));
1383                    self.old_value_iter.next_inner()?;
1384                }
1385                (Some((new_value_key, new_value)), Some((old_value_key, old_value))) => {
1386                    match new_value_key.user_key.cmp(&old_value_key.user_key) {
1387                        Ordering::Less => {
1388                            self.item_buffer = Some((
1389                                new_value_key.user_key.table_key.clone(),
1390                                ChangeLogValue::Insert(new_value.clone()),
1391                            ));
1392                            self.new_value_iter.next_inner()?;
1393                        }
1394                        Ordering::Greater => {
1395                            self.item_buffer = Some((
1396                                old_value_key.user_key.table_key.clone(),
1397                                ChangeLogValue::Delete(old_value.clone()),
1398                            ));
1399                            self.old_value_iter.next_inner()?;
1400                        }
1401                        Ordering::Equal => {
1402                            if new_value == old_value {
1403                                self.new_value_iter.next_inner()?;
1404                                self.old_value_iter.next_inner()?;
1405                                continue;
1406                            }
1407                            self.item_buffer = Some((
1408                                new_value_key.user_key.table_key.clone(),
1409                                ChangeLogValue::Update {
1410                                    new_value: new_value.clone(),
1411                                    old_value: old_value.clone(),
1412                                },
1413                            ));
1414                            self.new_value_iter.next_inner()?;
1415                            self.old_value_iter.next_inner()?;
1416                        }
1417                    }
1418                }
1419            };
1420            break;
1421        }
1422        Ok(self
1423            .item_buffer
1424            .as_ref()
1425            .map(|(key, value)| (key.to_ref(), value.to_ref())))
1426    }
1427}
1428
1429#[cfg(test)]
1430mod tests {
1431    use risingwave_common::util::epoch::test_epoch;
1432
1433    use super::*;
1434    use crate::hummock::iterator::test_utils::{
1435        iterator_test_table_key_of, iterator_test_value_of,
1436    };
1437    use crate::hummock::test_utils::StateStoreReadTestExt;
1438    use crate::memory::sled::SledStateStore;
1439
1440    #[tokio::test]
1441    async fn test_snapshot_isolation_memory() {
1442        let state_store = MemoryStateStore::new();
1443        test_snapshot_isolation_inner(state_store).await;
1444    }
1445
1446    #[cfg(not(madsim))]
1447    #[tokio::test]
1448    async fn test_snapshot_isolation_sled() {
1449        let state_store = SledStateStore::new_temp();
1450        test_snapshot_isolation_inner(state_store).await;
1451    }
1452
1453    async fn test_snapshot_isolation_inner(state_store: RangeKvStateStore<impl RangeKv>) {
1454        state_store
1455            .ingest_batch(
1456                vec![
1457                    (
1458                        TableKey(Bytes::from(b"a".to_vec())),
1459                        StorageValue::new_put(b"v1".to_vec()),
1460                    ),
1461                    (
1462                        TableKey(Bytes::from(b"b".to_vec())),
1463                        StorageValue::new_put(b"v1".to_vec()),
1464                    ),
1465                ],
1466                vec![],
1467                WriteOptions {
1468                    epoch: 0,
1469                    table_id: Default::default(),
1470                },
1471            )
1472            .unwrap();
1473        state_store
1474            .ingest_batch(
1475                vec![
1476                    (
1477                        TableKey(Bytes::from(b"a".to_vec())),
1478                        StorageValue::new_put(b"v2".to_vec()),
1479                    ),
1480                    (
1481                        TableKey(Bytes::from(b"b".to_vec())),
1482                        StorageValue::new_delete(),
1483                    ),
1484                ],
1485                vec![],
1486                WriteOptions {
1487                    epoch: test_epoch(1),
1488                    table_id: Default::default(),
1489                },
1490            )
1491            .unwrap();
1492        assert_eq!(
1493            state_store
1494                .scan(
1495                    (
1496                        Bound::Included(TableKey(Bytes::from("a"))),
1497                        Bound::Included(TableKey(Bytes::from("b"))),
1498                    ),
1499                    0,
1500                    TableId::default(),
1501                    None,
1502                )
1503                .unwrap(),
1504            vec![
1505                (
1506                    FullKey::for_test(Default::default(), Bytes::from("a"), 0)
1507                        .encode()
1508                        .into(),
1509                    b"v1".to_vec().into()
1510                ),
1511                (
1512                    FullKey::for_test(Default::default(), Bytes::from("b"), 0)
1513                        .encode()
1514                        .into(),
1515                    b"v1".to_vec().into()
1516                )
1517            ]
1518        );
1519        assert_eq!(
1520            state_store
1521                .scan(
1522                    (
1523                        Bound::Included(TableKey(Bytes::from("a"))),
1524                        Bound::Included(TableKey(Bytes::from("b"))),
1525                    ),
1526                    0,
1527                    TableId::default(),
1528                    Some(1),
1529                )
1530                .unwrap(),
1531            vec![(
1532                FullKey::for_test(Default::default(), b"a".to_vec(), 0)
1533                    .encode()
1534                    .into(),
1535                b"v1".to_vec().into()
1536            )]
1537        );
1538        assert_eq!(
1539            state_store
1540                .scan(
1541                    (
1542                        Bound::Included(TableKey(Bytes::from("a"))),
1543                        Bound::Included(TableKey(Bytes::from("b"))),
1544                    ),
1545                    test_epoch(1),
1546                    TableId::default(),
1547                    None,
1548                )
1549                .unwrap(),
1550            vec![(
1551                FullKey::for_test(Default::default(), b"a".to_vec(), test_epoch(1))
1552                    .encode()
1553                    .into(),
1554                b"v2".to_vec().into()
1555            )]
1556        );
1557        assert_eq!(
1558            state_store
1559                .get(TableKey(Bytes::from("a")), 0, ReadOptions::default(),)
1560                .await
1561                .unwrap(),
1562            Some(Bytes::from("v1"))
1563        );
1564        assert_eq!(
1565            state_store
1566                .get(
1567                    TableKey(Bytes::copy_from_slice(b"b")),
1568                    0,
1569                    ReadOptions::default(),
1570                )
1571                .await
1572                .unwrap(),
1573            Some(b"v1".to_vec().into())
1574        );
1575        assert_eq!(
1576            state_store
1577                .get(
1578                    TableKey(Bytes::copy_from_slice(b"c")),
1579                    0,
1580                    ReadOptions::default(),
1581                )
1582                .await
1583                .unwrap(),
1584            None
1585        );
1586        assert_eq!(
1587            state_store
1588                .get(
1589                    TableKey(Bytes::copy_from_slice(b"a")),
1590                    test_epoch(1),
1591                    ReadOptions::default(),
1592                )
1593                .await
1594                .unwrap(),
1595            Some(b"v2".to_vec().into())
1596        );
1597        assert_eq!(
1598            state_store
1599                .get(
1600                    TableKey(Bytes::from("b")),
1601                    test_epoch(1),
1602                    ReadOptions::default(),
1603                )
1604                .await
1605                .unwrap(),
1606            None
1607        );
1608        assert_eq!(
1609            state_store
1610                .get(
1611                    TableKey(Bytes::from("c")),
1612                    test_epoch(1),
1613                    ReadOptions::default()
1614                )
1615                .await
1616                .unwrap(),
1617            None
1618        );
1619    }
1620
1621    #[tokio::test]
1622    async fn test_iter_log_memory() {
1623        let state_store = MemoryStateStore::new();
1624        test_iter_log_inner(state_store).await;
1625    }
1626
1627    #[cfg(not(madsim))]
1628    #[tokio::test]
1629    async fn test_iter_log_sled() {
1630        let state_store = SledStateStore::new_temp();
1631        test_iter_log_inner(state_store).await;
1632    }
1633
1634    async fn test_iter_log_inner(state_store: RangeKvStateStore<impl RangeKv>) {
1635        let table_id = TableId::new(233);
1636        let epoch1 = test_epoch(1);
1637        let key_idx = [1, 2, 4];
1638        let make_key = |i| TableKey(Bytes::from(iterator_test_table_key_of(i)));
1639        let make_value = |i| Bytes::from(iterator_test_value_of(i));
1640        state_store
1641            .ingest_batch(
1642                key_idx
1643                    .iter()
1644                    .map(|i| (make_key(*i), StorageValue::new_put(make_value(*i))))
1645                    .collect(),
1646                vec![],
1647                WriteOptions {
1648                    epoch: epoch1,
1649                    table_id,
1650                },
1651            )
1652            .unwrap();
1653        {
1654            let mut iter = state_store
1655                .iter_log(
1656                    (epoch1, epoch1),
1657                    (Unbounded, Unbounded),
1658                    ReadLogOptions { table_id },
1659                )
1660                .await
1661                .unwrap();
1662            for i in key_idx {
1663                let (iter_key, change_value) = iter.try_next().await.unwrap().unwrap();
1664                assert_eq!(make_key(i).to_ref(), iter_key);
1665                assert_eq!(change_value, ChangeLogValue::Insert(make_value(i).as_ref()));
1666            }
1667            assert!(iter.try_next().await.unwrap().is_none());
1668        }
1669
1670        let epoch2 = test_epoch(2);
1671        state_store
1672            .ingest_batch(
1673                vec![
1674                    (make_key(1), StorageValue::new_put(make_value(12))), // update
1675                    (make_key(2), StorageValue::new_delete()),            // delete
1676                    (make_key(3), StorageValue::new_put(make_value(3))),
1677                ],
1678                vec![],
1679                WriteOptions {
1680                    epoch: epoch2,
1681                    table_id,
1682                },
1683            )
1684            .unwrap();
1685
1686        // check iter log between two epoch
1687        {
1688            let expected = vec![
1689                (
1690                    make_key(1),
1691                    ChangeLogValue::Update {
1692                        new_value: make_value(12),
1693                        old_value: make_value(1),
1694                    },
1695                ),
1696                (make_key(2), ChangeLogValue::Delete(make_value(2))),
1697                (make_key(3), ChangeLogValue::Insert(make_value(3))),
1698            ];
1699            let mut iter = state_store
1700                .iter_log(
1701                    (epoch2, epoch2),
1702                    (Unbounded, Unbounded),
1703                    ReadLogOptions { table_id },
1704                )
1705                .await
1706                .unwrap();
1707            for (key, change_log_value) in expected {
1708                let (iter_key, iter_value) = iter.try_next().await.unwrap().unwrap();
1709                assert_eq!(
1710                    key.to_ref(),
1711                    iter_key,
1712                    "{:?} {:?}",
1713                    change_log_value.to_ref(),
1714                    iter_value
1715                );
1716                assert_eq!(change_log_value.to_ref(), iter_value);
1717            }
1718            assert!(iter.try_next().await.unwrap().is_none());
1719        }
1720        // check iter log on the original old epoch
1721        {
1722            let mut iter = state_store
1723                .iter_log(
1724                    (epoch1, epoch1),
1725                    (Unbounded, Unbounded),
1726                    ReadLogOptions { table_id },
1727                )
1728                .await
1729                .unwrap();
1730            for i in key_idx {
1731                let (iter_key, change_value) = iter.try_next().await.unwrap().unwrap();
1732                assert_eq!(make_key(i).to_ref(), iter_key);
1733                assert_eq!(change_value, ChangeLogValue::Insert(make_value(i).as_ref()));
1734            }
1735            assert!(iter.try_next().await.unwrap().is_none());
1736        }
1737        // check iter on merging the two epochs
1738        {
1739            let mut iter = state_store
1740                .iter_log(
1741                    (epoch1, epoch2),
1742                    (Unbounded, Unbounded),
1743                    ReadLogOptions { table_id },
1744                )
1745                .await
1746                .unwrap();
1747            let (iter_key, change_value) = iter.try_next().await.unwrap().unwrap();
1748            assert_eq!(make_key(1).to_ref(), iter_key);
1749            assert_eq!(
1750                change_value,
1751                ChangeLogValue::Insert(make_value(12).as_ref())
1752            );
1753            for i in [3, 4] {
1754                let (iter_key, change_value) = iter.try_next().await.unwrap().unwrap();
1755                assert_eq!(make_key(i).to_ref(), iter_key);
1756                assert_eq!(change_value, ChangeLogValue::Insert(make_value(i).as_ref()));
1757            }
1758            assert!(iter.try_next().await.unwrap().is_none());
1759        }
1760    }
1761}