risingwave_storage/
memory.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
17use std::mem::take;
18use std::ops::Bound::{Excluded, Included, Unbounded};
19use std::ops::{Bound, RangeBounds};
20use std::sync::{Arc, LazyLock};
21
22use bytes::Bytes;
23use itertools::Itertools;
24use parking_lot::RwLock;
25use risingwave_common::array::VectorRef;
26use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
27use risingwave_common::catalog::TableId;
28use risingwave_common::dispatch_distance_measurement;
29use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
30use risingwave_common::id::FragmentId;
31use risingwave_common::types::ScalarRef;
32use risingwave_common::util::epoch::{EpochPair, MAX_EPOCH};
33use risingwave_hummock_sdk::key::{
34    FullKey, TableKey, TableKeyRange, UserKey, prefixed_range_with_vnode,
35};
36use risingwave_hummock_sdk::table_watermark::WatermarkDirection;
37use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch};
38use thiserror_ext::AsReport;
39use tokio::task::yield_now;
40use tracing::error;
41
42use crate::error::StorageResult;
43use crate::hummock::HummockError;
44use crate::hummock::utils::{
45    do_delete_sanity_check, do_insert_sanity_check, do_update_sanity_check, merge_stream,
46    sanity_check_enabled,
47};
48use crate::mem_table::{KeyOp, MemTable};
49use crate::storage_value::StorageValue;
50use crate::store::*;
51use crate::vector::{MeasureDistanceBuilder, NearestBuilder};
52
53pub type BytesFullKey = FullKey<Bytes>;
54pub type BytesFullKeyRange = (Bound<BytesFullKey>, Bound<BytesFullKey>);
55
56pub trait RangeKv: Clone + Send + Sync + 'static {
57    fn range(
58        &self,
59        range: BytesFullKeyRange,
60        limit: Option<usize>,
61    ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>>;
62
63    fn rev_range(
64        &self,
65        range: BytesFullKeyRange,
66        limit: Option<usize>,
67    ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>>;
68
69    fn ingest_batch(
70        &self,
71        kv_pairs: impl Iterator<Item = (BytesFullKey, Option<Bytes>)>,
72    ) -> StorageResult<()>;
73
74    fn flush(&self) -> StorageResult<()>;
75}
76
77pub type BTreeMapRangeKv = Arc<RwLock<BTreeMap<BytesFullKey, Option<Bytes>>>>;
78
79impl RangeKv for BTreeMapRangeKv {
80    fn range(
81        &self,
82        range: BytesFullKeyRange,
83        limit: Option<usize>,
84    ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>> {
85        let limit = limit.unwrap_or(usize::MAX);
86        Ok(self
87            .read()
88            .range(range)
89            .take(limit)
90            .map(|(key, value)| (key.clone(), value.clone()))
91            .collect())
92    }
93
94    fn rev_range(
95        &self,
96        range: BytesFullKeyRange,
97        limit: Option<usize>,
98    ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>> {
99        let limit = limit.unwrap_or(usize::MAX);
100        Ok(self
101            .read()
102            .range(range)
103            .rev()
104            .take(limit)
105            .map(|(key, value)| (key.clone(), value.clone()))
106            .collect())
107    }
108
109    fn ingest_batch(
110        &self,
111        kv_pairs: impl Iterator<Item = (BytesFullKey, Option<Bytes>)>,
112    ) -> StorageResult<()> {
113        let mut inner = self.write();
114        for (key, value) in kv_pairs {
115            inner.insert(key, value);
116        }
117        Ok(())
118    }
119
120    fn flush(&self) -> StorageResult<()> {
121        Ok(())
122    }
123}
124
125pub mod sled {
126    use std::fs::create_dir_all;
127    use std::ops::RangeBounds;
128
129    use bytes::Bytes;
130    use risingwave_hummock_sdk::key::FullKey;
131
132    use crate::error::StorageResult;
133    use crate::memory::{BytesFullKey, BytesFullKeyRange, RangeKv, RangeKvStateStore};
134
135    #[derive(Clone)]
136    pub struct SledRangeKv {
137        inner: sled::Db,
138    }
139
140    impl SledRangeKv {
141        pub fn new(path: impl AsRef<std::path::Path>) -> Self {
142            SledRangeKv {
143                inner: sled::open(path).expect("open"),
144            }
145        }
146
147        pub fn new_temp() -> Self {
148            create_dir_all("./.risingwave/sled").expect("should create");
149            let path = tempfile::TempDir::new_in("./.risingwave/sled")
150                .expect("find temp dir")
151                .keep();
152            Self::new(path)
153        }
154    }
155
156    const EMPTY: u8 = 1;
157    const NON_EMPTY: u8 = 0;
158
159    impl RangeKv for SledRangeKv {
160        fn range(
161            &self,
162            range: BytesFullKeyRange,
163            limit: Option<usize>,
164        ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>> {
165            let (left, right) = range;
166            let full_key_ref_bound = (
167                left.as_ref().map(FullKey::to_ref),
168                right.as_ref().map(FullKey::to_ref),
169            );
170            let left_encoded = left.as_ref().map(|key| key.to_ref().encode_reverse_epoch());
171            let right_encoded = right
172                .as_ref()
173                .map(|key| key.to_ref().encode_reverse_epoch());
174            let limit = limit.unwrap_or(usize::MAX);
175            let mut ret = vec![];
176            for result in self.inner.range((left_encoded, right_encoded)).take(limit) {
177                let (key, value) = result?;
178                let full_key = FullKey::decode_reverse_epoch(key.as_ref()).copy_into();
179                if !full_key_ref_bound.contains(&full_key.to_ref()) {
180                    continue;
181                }
182                let value = match value.as_ref() {
183                    [EMPTY] => None,
184                    [NON_EMPTY, rest @ ..] => Some(Bytes::from(Vec::from(rest))),
185                    _ => unreachable!("malformed value: {:?}", value),
186                };
187                ret.push((full_key, value))
188            }
189            Ok(ret)
190        }
191
192        fn rev_range(
193            &self,
194            range: BytesFullKeyRange,
195            limit: Option<usize>,
196        ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>> {
197            let (left, right) = range;
198            let full_key_ref_bound = (
199                left.as_ref().map(FullKey::to_ref),
200                right.as_ref().map(FullKey::to_ref),
201            );
202            let left_encoded = left.as_ref().map(|key| key.to_ref().encode_reverse_epoch());
203            let right_encoded = right
204                .as_ref()
205                .map(|key| key.to_ref().encode_reverse_epoch());
206            let limit = limit.unwrap_or(usize::MAX);
207            let mut ret = vec![];
208            for result in self
209                .inner
210                .range((left_encoded, right_encoded))
211                .rev()
212                .take(limit)
213            {
214                let (key, value) = result?;
215                let full_key = FullKey::decode_reverse_epoch(key.as_ref()).copy_into();
216                if !full_key_ref_bound.contains(&full_key.to_ref()) {
217                    continue;
218                }
219                let value = match value.as_ref() {
220                    [EMPTY] => None,
221                    [NON_EMPTY, rest @ ..] => Some(Bytes::from(Vec::from(rest))),
222                    _ => unreachable!("malformed value: {:?}", value),
223                };
224                ret.push((full_key, value))
225            }
226            Ok(ret)
227        }
228
229        fn ingest_batch(
230            &self,
231            kv_pairs: impl Iterator<Item = (BytesFullKey, Option<Bytes>)>,
232        ) -> StorageResult<()> {
233            let mut batch = sled::Batch::default();
234            for (key, value) in kv_pairs {
235                let encoded_key = key.encode_reverse_epoch();
236                let key = sled::IVec::from(encoded_key);
237                let mut buffer =
238                    Vec::with_capacity(value.as_ref().map(|v| v.len()).unwrap_or_default() + 1);
239                if let Some(value) = value {
240                    buffer.push(NON_EMPTY);
241                    buffer.extend_from_slice(value.as_ref());
242                } else {
243                    buffer.push(EMPTY);
244                }
245                let value = sled::IVec::from(buffer);
246                batch.insert(key, value);
247            }
248            self.inner.apply_batch(batch)?;
249            Ok(())
250        }
251
252        fn flush(&self) -> StorageResult<()> {
253            Ok(self.inner.flush().map(|_| {})?)
254        }
255    }
256
257    pub type SledStateStore = RangeKvStateStore<SledRangeKv>;
258
259    impl SledStateStore {
260        pub fn new(path: impl AsRef<std::path::Path>) -> Self {
261            RangeKvStateStore {
262                inner: SledRangeKv::new(path),
263                tables: Default::default(),
264                vectors: Default::default(),
265            }
266        }
267
268        pub fn new_temp() -> Self {
269            RangeKvStateStore {
270                inner: SledRangeKv::new_temp(),
271                tables: Default::default(),
272                vectors: Default::default(),
273            }
274        }
275    }
276
277    #[cfg(test)]
278    mod test {
279        use std::ops::{Bound, RangeBounds};
280
281        use bytes::Bytes;
282        use risingwave_common::catalog::TableId;
283        use risingwave_common::util::epoch::EPOCH_SPILL_TIME_MASK;
284        use risingwave_hummock_sdk::EpochWithGap;
285        use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey};
286
287        use crate::memory::RangeKv;
288        use crate::memory::sled::SledRangeKv;
289
290        #[test]
291        fn test_filter_variable_key_length_false_positive() {
292            let table_id = TableId::new(233);
293            let epoch = u64::MAX - u64::from_be_bytes([1, 2, 3, 4, 5, 6, 7, 8]);
294            let excluded_short_table_key = [0, 1, 0, 0];
295            let included_long_table_key = [0, 1, 0, 0, 1, 2];
296            let left_table_key = [0, 1, 0, 0, 1];
297            let right_table_key = [0, 1, 1, 1];
298
299            let to_full_key = |table_key: &[u8]| FullKey {
300                user_key: UserKey {
301                    table_id,
302                    table_key: TableKey(Bytes::from(table_key.to_vec())),
303                },
304                epoch_with_gap: EpochWithGap::new_from_epoch(epoch & !EPOCH_SPILL_TIME_MASK),
305            };
306
307            let left_full_key = to_full_key(&left_table_key[..]);
308            let right_full_key = to_full_key(&right_table_key[..]);
309            let included_long_full_key = to_full_key(&included_long_table_key[..]);
310            let excluded_short_full_key = to_full_key(&excluded_short_table_key[..]);
311
312            assert!(
313                (
314                    Bound::Included(left_full_key.to_ref()),
315                    Bound::Included(right_full_key.to_ref())
316                )
317                    .contains(&included_long_full_key.to_ref())
318            );
319            assert!(
320                !(
321                    Bound::Included(left_full_key.to_ref()),
322                    Bound::Included(right_full_key.to_ref())
323                )
324                    .contains(&excluded_short_full_key.to_ref())
325            );
326
327            let left_encoded = left_full_key.encode_reverse_epoch();
328            let right_encoded = right_full_key.encode_reverse_epoch();
329
330            assert!(
331                (
332                    Bound::Included(left_encoded.clone()),
333                    Bound::Included(right_encoded.clone())
334                )
335                    .contains(&included_long_full_key.encode_reverse_epoch())
336            );
337            assert!(
338                (
339                    Bound::Included(left_encoded),
340                    Bound::Included(right_encoded)
341                )
342                    .contains(&excluded_short_full_key.encode_reverse_epoch())
343            );
344
345            let sled_range_kv = SledRangeKv::new_temp();
346            sled_range_kv
347                .ingest_batch(
348                    vec![
349                        (included_long_full_key.clone(), None),
350                        (excluded_short_full_key, None),
351                    ]
352                    .into_iter(),
353                )
354                .unwrap();
355            let kvs = sled_range_kv
356                .range(
357                    (
358                        Bound::Included(left_full_key),
359                        Bound::Included(right_full_key),
360                    ),
361                    None,
362                )
363                .unwrap();
364            assert_eq!(1, kvs.len());
365            assert_eq!(included_long_full_key.to_ref(), kvs[0].0.to_ref());
366            assert!(kvs[0].1.is_none());
367        }
368    }
369}
370
371mod batched_iter {
372
373    use super::*;
374
375    /// A utility struct for iterating over a range of keys in a locked `BTreeMap`, which will batch
376    /// some records to make a trade-off between the copying overhead and the times of acquiring
377    /// the lock.
378    ///
379    /// Therefore, it's not guaranteed that we're iterating over a consistent snapshot of the map.
380    /// Users should handle MVCC by themselves.
381    pub struct Iter<R: RangeKv> {
382        inner: R,
383        range: BytesFullKeyRange,
384        current: std::vec::IntoIter<(FullKey<Bytes>, Option<Bytes>)>,
385        rev: bool,
386    }
387
388    impl<R: RangeKv> Iter<R> {
389        pub fn new(inner: R, range: BytesFullKeyRange, rev: bool) -> Self {
390            Self {
391                inner,
392                range,
393                rev,
394                current: Vec::new().into_iter(),
395            }
396        }
397    }
398
399    impl<R: RangeKv> Iter<R> {
400        const BATCH_SIZE: usize = 256;
401
402        /// Get the next batch of records and fill the `current` buffer.
403        fn refill(&mut self) -> StorageResult<()> {
404            assert!(self.current.is_empty());
405
406            let batch = if self.rev {
407                self.inner.rev_range(
408                    (self.range.0.clone(), self.range.1.clone()),
409                    Some(Self::BATCH_SIZE),
410                )?
411            } else {
412                self.inner.range(
413                    (self.range.0.clone(), self.range.1.clone()),
414                    Some(Self::BATCH_SIZE),
415                )?
416            };
417
418            if let Some((last_key, _)) = batch.last() {
419                let full_key = FullKey::new_with_gap_epoch(
420                    last_key.user_key.table_id,
421                    TableKey(last_key.user_key.table_key.0.clone()),
422                    last_key.epoch_with_gap,
423                );
424                if self.rev {
425                    self.range.1 = Bound::Excluded(full_key);
426                } else {
427                    self.range.0 = Bound::Excluded(full_key);
428                }
429            }
430            self.current = batch.into_iter();
431            Ok(())
432        }
433    }
434
435    impl<R: RangeKv> Iter<R> {
436        pub fn next(&mut self) -> StorageResult<Option<(BytesFullKey, Option<Bytes>)>> {
437            match self.current.next() {
438                Some((key, value)) => Ok(Some((key, value))),
439                None => {
440                    self.refill()?;
441                    Ok(self.current.next())
442                }
443            }
444        }
445    }
446
447    #[cfg(test)]
448    mod tests {
449        use rand::Rng;
450
451        use super::*;
452        use crate::memory::sled::SledRangeKv;
453
454        #[test]
455        fn test_btreemap_iter_chaos() {
456            let map = Arc::new(RwLock::new(BTreeMap::new()));
457            test_iter_chaos_inner(map, 1000);
458        }
459
460        #[cfg(not(madsim))]
461        #[test]
462        fn test_sled_iter_chaos() {
463            let map = SledRangeKv::new_temp();
464            test_iter_chaos_inner(map, 100);
465        }
466
467        fn test_iter_chaos_inner(map: impl RangeKv, count: usize) {
468            let key_range = 1..=10000;
469            let num_to_bytes = |k: i32| Bytes::from(format!("{:06}", k).as_bytes().to_vec());
470            let num_to_full_key =
471                |k: i32| FullKey::new(TableId::default(), TableKey(num_to_bytes(k)), 0);
472
473            map.ingest_batch(key_range.clone().map(|k| {
474                let key = num_to_full_key(k);
475                let b = key.user_key.table_key.0.clone();
476
477                (key, Some(b))
478            }))
479            .unwrap();
480
481            let rand_bound = || {
482                let key = rand::rng().random_range(key_range.clone());
483                let key = num_to_full_key(key);
484                match rand::rng().random_range(1..=5) {
485                    1 | 2 => Bound::Included(key),
486                    3 | 4 => Bound::Excluded(key),
487                    _ => Bound::Unbounded,
488                }
489            };
490
491            for _ in 0..count {
492                let range = loop {
493                    let range = (rand_bound(), rand_bound());
494                    let (start, end) = (range.start_bound(), range.end_bound());
495
496                    // Filter out invalid ranges. Code migrated from `BTreeMap::range`.
497                    match (start, end) {
498                        (Bound::Excluded(s), Bound::Excluded(e)) if s == e => {
499                            continue;
500                        }
501                        (
502                            Bound::Included(s) | Bound::Excluded(s),
503                            Bound::Included(e) | Bound::Excluded(e),
504                        ) if s > e => {
505                            continue;
506                        }
507                        _ => break range,
508                    }
509                };
510
511                let v1 = {
512                    let mut v = vec![];
513                    let mut iter = Iter::new(map.clone(), range.clone(), false);
514                    while let Some((key, value)) = iter.next().unwrap() {
515                        v.push((key, value));
516                    }
517                    v
518                };
519                let v2 = map.range(range, None).unwrap();
520
521                // Items iterated from the batched iterator should be the same as normaliterator.
522                assert_eq!(v1, v2);
523            }
524        }
525    }
526}
527
528pub type MemoryStateStore = RangeKvStateStore<BTreeMapRangeKv>;
529
530struct TableState {
531    init_epoch: u64,
532    next_epochs: BTreeMap<u64, u64>,
533    latest_sealed_epoch: Option<u64>,
534    sealing_epochs: BTreeMap<u64, BitmapBuilder>,
535}
536
537impl TableState {
538    fn new(init_epoch: u64) -> Self {
539        Self {
540            init_epoch,
541            next_epochs: Default::default(),
542            latest_sealed_epoch: None,
543            sealing_epochs: Default::default(),
544        }
545    }
546
547    async fn wait_epoch(
548        tables: &parking_lot::Mutex<HashMap<TableId, Self>>,
549        table_id: TableId,
550        epoch: u64,
551    ) {
552        loop {
553            {
554                let tables = tables.lock();
555                let table_state = tables.get(&table_id).expect("should exist");
556                assert!(epoch >= table_state.init_epoch);
557                if epoch == table_state.init_epoch {
558                    return;
559                }
560                if let Some(latest_sealed_epoch) = table_state.latest_sealed_epoch
561                    && latest_sealed_epoch >= epoch
562                {
563                    return;
564                }
565            }
566            yield_now().await;
567        }
568    }
569}
570
571type InMemVectorStore = Arc<RwLock<HashMap<TableId, Vec<(Vector, Bytes, u64)>>>>;
572
573/// An in-memory state store
574///
575/// The in-memory state store is a [`BTreeMap`], which maps [`FullKey`] to value. It
576/// never does GC, so the memory usage will be high. Therefore, in-memory state store should never
577/// be used in production.
578#[derive(Clone, Default)]
579pub struct RangeKvStateStore<R: RangeKv> {
580    /// Stores (key, epoch) -> user value.
581    inner: R,
582    /// `table_id` -> `prev_epoch` -> `curr_epoch`
583    tables: Arc<parking_lot::Mutex<HashMap<TableId, TableState>>>,
584
585    vectors: InMemVectorStore,
586}
587
588fn to_full_key_range<R, B>(table_id: TableId, table_key_range: R) -> BytesFullKeyRange
589where
590    R: RangeBounds<B> + Send,
591    B: AsRef<[u8]>,
592{
593    let start = match table_key_range.start_bound() {
594        Included(k) => Included(FullKey::new(
595            table_id,
596            TableKey(Bytes::from(k.as_ref().to_vec())),
597            HummockEpoch::MAX,
598        )),
599        Excluded(k) => Excluded(FullKey::new(
600            table_id,
601            TableKey(Bytes::from(k.as_ref().to_vec())),
602            0,
603        )),
604        Unbounded => Included(FullKey::new(
605            table_id,
606            TableKey(Bytes::from(b"".to_vec())),
607            HummockEpoch::MAX,
608        )),
609    };
610    let end = match table_key_range.end_bound() {
611        Included(k) => Included(FullKey::new(
612            table_id,
613            TableKey(Bytes::from(k.as_ref().to_vec())),
614            0,
615        )),
616        Excluded(k) => Excluded(FullKey::new(
617            table_id,
618            TableKey(Bytes::from(k.as_ref().to_vec())),
619            HummockEpoch::MAX,
620        )),
621        Unbounded => {
622            if let Some(next_table_id) = table_id.as_raw_id().checked_add(1) {
623                Excluded(FullKey::new(
624                    next_table_id.into(),
625                    TableKey(Bytes::from(b"".to_vec())),
626                    HummockEpoch::MAX,
627                ))
628            } else {
629                Unbounded
630            }
631        }
632    };
633    (start, end)
634}
635
636impl MemoryStateStore {
637    pub fn new() -> Self {
638        Self::default()
639    }
640
641    pub fn shared() -> Self {
642        static STORE: LazyLock<MemoryStateStore> = LazyLock::new(MemoryStateStore::new);
643        STORE.clone()
644    }
645}
646
647impl<R: RangeKv> RangeKvStateStore<R> {
648    fn scan(
649        &self,
650        key_range: TableKeyRange,
651        epoch: u64,
652        table_id: TableId,
653        limit: Option<usize>,
654    ) -> StorageResult<Vec<(Bytes, Bytes)>> {
655        let mut data = vec![];
656        if limit == Some(0) {
657            return Ok(vec![]);
658        }
659        let mut last_user_key = None;
660        for (key, value) in self
661            .inner
662            .range(to_full_key_range(table_id, key_range), None)?
663        {
664            if key.epoch_with_gap.pure_epoch() > epoch {
665                continue;
666            }
667            if Some(&key.user_key) != last_user_key.as_ref() {
668                if let Some(value) = value {
669                    data.push((Bytes::from(key.encode()), value.clone()));
670                }
671                last_user_key = Some(key.user_key.clone());
672            }
673            if let Some(limit) = limit
674                && data.len() >= limit
675            {
676                break;
677            }
678        }
679        Ok(data)
680    }
681}
682
683#[derive(Clone)]
684pub struct RangeKvStateStoreReadSnapshot<R: RangeKv> {
685    inner: RangeKvStateStore<R>,
686    epoch: u64,
687    table_id: TableId,
688}
689
690impl<R: RangeKv> StateStoreGet for RangeKvStateStoreReadSnapshot<R> {
691    async fn on_key_value<'a, O: Send + 'a>(
692        &'a self,
693        key: TableKey<Bytes>,
694        _read_options: ReadOptions,
695        on_key_value_fn: impl KeyValueFn<'a, O>,
696    ) -> StorageResult<Option<O>> {
697        self.inner
698            .get_keyed_row_impl(key, self.epoch, self.table_id)
699            .and_then(|option| {
700                if let Some((key, value)) = option {
701                    on_key_value_fn(key.to_ref(), value.as_ref()).map(Some)
702                } else {
703                    Ok(None)
704                }
705            })
706    }
707}
708
709impl<R: RangeKv> StateStoreRead for RangeKvStateStoreReadSnapshot<R> {
710    type Iter = RangeKvStateStoreIter<R>;
711    type RevIter = RangeKvStateStoreRevIter<R>;
712
713    async fn iter(
714        &self,
715        key_range: TableKeyRange,
716        _read_options: ReadOptions,
717    ) -> StorageResult<Self::Iter> {
718        self.inner.iter_impl(key_range, self.epoch, self.table_id)
719    }
720
721    async fn rev_iter(
722        &self,
723        key_range: TableKeyRange,
724        _read_options: ReadOptions,
725    ) -> StorageResult<Self::RevIter> {
726        self.inner
727            .rev_iter_impl(key_range, self.epoch, self.table_id)
728    }
729}
730
731impl<R: RangeKv> StateStoreReadVector for RangeKvStateStoreReadSnapshot<R> {
732    async fn nearest<'a, O: Send + 'a>(
733        &'a self,
734        vec: VectorRef<'a>,
735        options: VectorNearestOptions,
736        on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
737    ) -> StorageResult<Vec<O>> {
738        fn nearest_impl<'a, M: MeasureDistanceBuilder, O>(
739            store: &'a InMemVectorStore,
740            epoch: u64,
741            table_id: TableId,
742            vec: VectorRef<'a>,
743            options: VectorNearestOptions,
744            on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
745        ) -> Vec<O> {
746            let mut builder = NearestBuilder::<'_, O, M>::new(vec, options.top_n);
747            builder.add(
748                store
749                    .read()
750                    .get(&table_id)
751                    .map(|vec| vec.iter())
752                    .into_iter()
753                    .flatten()
754                    .filter(|(_, _, vector_epoch)| epoch >= *vector_epoch)
755                    .map(|(vec, info, _)| (vec.to_ref(), info.as_ref())),
756                on_nearest_item_fn,
757            );
758            builder.finish()
759        }
760        dispatch_distance_measurement!(options.measure, MeasurementType, {
761            Ok(nearest_impl::<MeasurementType, O>(
762                &self.inner.vectors,
763                self.epoch,
764                self.table_id,
765                vec,
766                options,
767                on_nearest_item_fn,
768            ))
769        })
770    }
771}
772
773impl<R: RangeKv> RangeKvStateStore<R> {
774    fn get_keyed_row_impl(
775        &self,
776        key: TableKey<Bytes>,
777        epoch: u64,
778        table_id: TableId,
779    ) -> StorageResult<Option<StateStoreKeyedRow>> {
780        let range_bounds = (Bound::Included(key.clone()), Bound::Included(key));
781        // We do not really care about vnodes here, so we just use the default value.
782        let res = self.scan(range_bounds, epoch, table_id, Some(1))?;
783
784        Ok(match res.as_slice() {
785            [] => None,
786            [(key, value)] => Some((
787                FullKey::decode(key.as_ref()).to_vec().into_bytes(),
788                value.clone(),
789            )),
790            _ => unreachable!(),
791        })
792    }
793
794    fn iter_impl(
795        &self,
796        key_range: TableKeyRange,
797        epoch: u64,
798        table_id: TableId,
799    ) -> StorageResult<RangeKvStateStoreIter<R>> {
800        Ok(RangeKvStateStoreIter::new(
801            batched_iter::Iter::new(
802                self.inner.clone(),
803                to_full_key_range(table_id, key_range),
804                false,
805            ),
806            epoch,
807            true,
808        ))
809    }
810
811    fn rev_iter_impl(
812        &self,
813        key_range: TableKeyRange,
814        epoch: u64,
815        table_id: TableId,
816    ) -> StorageResult<RangeKvStateStoreRevIter<R>> {
817        Ok(RangeKvStateStoreRevIter::new(
818            batched_iter::Iter::new(
819                self.inner.clone(),
820                to_full_key_range(table_id, key_range),
821                true,
822            ),
823            epoch,
824            true,
825        ))
826    }
827}
828
829impl<R: RangeKv> StateStoreReadLog for RangeKvStateStore<R> {
830    type ChangeLogIter = RangeKvStateStoreChangeLogIter<R>;
831
832    async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
833        loop {
834            {
835                let tables = self.tables.lock();
836                let Some(tables) = tables.get(&options.table_id) else {
837                    return Err(HummockError::next_epoch(format!(
838                        "table {} not exist",
839                        options.table_id
840                    ))
841                    .into());
842                };
843                if let Some(next_epoch) = tables.next_epochs.get(&epoch) {
844                    break Ok(*next_epoch);
845                }
846            }
847            yield_now().await;
848        }
849    }
850
851    async fn iter_log(
852        &self,
853        (min_epoch, max_epoch): (u64, u64),
854        key_range: TableKeyRange,
855        options: ReadLogOptions,
856    ) -> StorageResult<Self::ChangeLogIter> {
857        let new_value_iter = RangeKvStateStoreIter::new(
858            batched_iter::Iter::new(
859                self.inner.clone(),
860                to_full_key_range(options.table_id, key_range.clone()),
861                false,
862            ),
863            max_epoch,
864            true,
865        );
866        let old_value_iter = RangeKvStateStoreIter::new(
867            batched_iter::Iter::new(
868                self.inner.clone(),
869                to_full_key_range(options.table_id, key_range),
870                false,
871            ),
872            min_epoch,
873            false,
874        );
875        RangeKvStateStoreChangeLogIter::new(new_value_iter, old_value_iter)
876    }
877}
878
879impl<R: RangeKv> RangeKvStateStore<R> {
880    fn new_read_snapshot_impl(
881        &self,
882        epoch: u64,
883        table_id: TableId,
884    ) -> RangeKvStateStoreReadSnapshot<R> {
885        RangeKvStateStoreReadSnapshot {
886            inner: self.clone(),
887            epoch,
888            table_id,
889        }
890    }
891
892    pub(crate) fn ingest_batch(
893        &self,
894        mut kv_pairs: Vec<(TableKey<Bytes>, StorageValue)>,
895        delete_ranges: Vec<(Bound<Bytes>, Bound<Bytes>)>,
896        epoch: u64,
897        table_id: TableId,
898    ) -> StorageResult<usize> {
899        let mut delete_keys = BTreeSet::new();
900        for del_range in delete_ranges {
901            for (key, _) in self.inner.range(
902                (
903                    del_range
904                        .0
905                        .map(|table_key| FullKey::new(table_id, TableKey(table_key), epoch)),
906                    del_range
907                        .1
908                        .map(|table_key| FullKey::new(table_id, TableKey(table_key), epoch)),
909                ),
910                None,
911            )? {
912                delete_keys.insert(key.user_key.table_key);
913            }
914        }
915        for key in delete_keys {
916            kv_pairs.push((key, StorageValue::new_delete()));
917        }
918
919        let mut size = 0;
920        self.inner
921            .ingest_batch(kv_pairs.into_iter().map(|(key, value)| {
922                size += key.len() + value.size();
923                (FullKey::new(table_id, key, epoch), value.user_value)
924            }))?;
925        Ok(size)
926    }
927
928    fn ingest_vectors(&self, table_id: TableId, epoch: u64, vecs: Vec<(Vector, Bytes)>) {
929        self.vectors
930            .write()
931            .entry(table_id)
932            .or_default()
933            .extend(vecs.into_iter().map(|(vec, info)| (vec, info, epoch)));
934    }
935}
936
937impl<R: RangeKv> StateStore for RangeKvStateStore<R> {
938    type Local = RangeKvLocalStateStore<R>;
939    type ReadSnapshot = RangeKvStateStoreReadSnapshot<R>;
940    type VectorWriter = RangeKvLocalStateStore<R>;
941
942    async fn try_wait_epoch(
943        &self,
944        _epoch: HummockReadEpoch,
945        _options: TryWaitEpochOptions,
946    ) -> StorageResult<()> {
947        // memory backend doesn't need to wait for epoch, so this is a no-op.
948        Ok(())
949    }
950
951    async fn new_local(&self, option: NewLocalOptions) -> Self::Local {
952        RangeKvLocalStateStore::new(self.clone(), option)
953    }
954
955    async fn new_read_snapshot(
956        &self,
957        epoch: HummockReadEpoch,
958        options: NewReadSnapshotOptions,
959    ) -> StorageResult<Self::ReadSnapshot> {
960        Ok(self.new_read_snapshot_impl(epoch.get_epoch(), options.table_id))
961    }
962
963    async fn new_vector_writer(&self, options: NewVectorWriterOptions) -> Self::VectorWriter {
964        RangeKvLocalStateStore::new(
965            self.clone(),
966            NewLocalOptions {
967                table_id: options.table_id,
968                fragment_id: FragmentId::default(),
969                op_consistency_level: Default::default(),
970                table_option: Default::default(),
971                is_replicated: false,
972                vnodes: Arc::new(Bitmap::from_bool_slice(&[true])),
973                upload_on_flush: true,
974            },
975        )
976    }
977}
978
979pub struct RangeKvLocalStateStore<R: RangeKv> {
980    mem_table: MemTable,
981    vectors: Vec<(Vector, Bytes)>,
982    inner: RangeKvStateStore<R>,
983
984    epoch: Option<EpochPair>,
985
986    table_id: TableId,
987    op_consistency_level: OpConsistencyLevel,
988    vnodes: Arc<Bitmap>,
989}
990
991impl<R: RangeKv> RangeKvLocalStateStore<R> {
992    pub fn new(inner: RangeKvStateStore<R>, option: NewLocalOptions) -> Self {
993        Self {
994            inner,
995            mem_table: MemTable::new(option.table_id, option.op_consistency_level.clone()),
996            epoch: None,
997            table_id: option.table_id,
998            op_consistency_level: option.op_consistency_level,
999            vnodes: option.vnodes,
1000            vectors: vec![],
1001        }
1002    }
1003
1004    fn epoch(&self) -> u64 {
1005        self.epoch.expect("should have set the epoch").curr
1006    }
1007}
1008
1009impl<R: RangeKv> StateStoreGet for RangeKvLocalStateStore<R> {
1010    async fn on_key_value<'a, O: Send + 'a>(
1011        &'a self,
1012        key: TableKey<Bytes>,
1013        _read_options: ReadOptions,
1014        on_key_value_fn: impl KeyValueFn<'a, O>,
1015    ) -> StorageResult<Option<O>> {
1016        if let Some((key, value)) = match self.mem_table.buffer.get(&key) {
1017            None => self
1018                .inner
1019                .get_keyed_row_impl(key, self.epoch(), self.table_id)?,
1020            Some(op) => match op {
1021                KeyOp::Insert(value) | KeyOp::Update((_, value)) => Some((
1022                    FullKey::new(self.table_id, key, self.epoch()),
1023                    value.clone(),
1024                )),
1025                KeyOp::Delete(_) => None,
1026            },
1027        } {
1028            Ok(Some(on_key_value_fn(key.to_ref(), value.as_ref())?))
1029        } else {
1030            Ok(None)
1031        }
1032    }
1033}
1034
1035impl<R: RangeKv> LocalStateStore for RangeKvLocalStateStore<R> {
1036    type FlushedSnapshotReader = RangeKvStateStoreReadSnapshot<R>;
1037
1038    type Iter<'a> = impl StateStoreIter + 'a;
1039    type RevIter<'a> = impl StateStoreIter + 'a;
1040
1041    async fn iter(
1042        &self,
1043        key_range: TableKeyRange,
1044        _read_options: ReadOptions,
1045    ) -> StorageResult<Self::Iter<'_>> {
1046        let iter = self
1047            .inner
1048            .iter_impl(key_range.clone(), self.epoch(), self.table_id)?;
1049        Ok(FromStreamStateStoreIter::new(Box::pin(merge_stream(
1050            self.mem_table.iter(key_range),
1051            iter.into_stream(to_owned_item),
1052            self.table_id,
1053            self.epoch(),
1054            false,
1055        ))))
1056    }
1057
1058    async fn rev_iter(
1059        &self,
1060        key_range: TableKeyRange,
1061        _read_options: ReadOptions,
1062    ) -> StorageResult<Self::RevIter<'_>> {
1063        let iter = self
1064            .inner
1065            .rev_iter_impl(key_range.clone(), self.epoch(), self.table_id)?;
1066        Ok(FromStreamStateStoreIter::new(Box::pin(merge_stream(
1067            self.mem_table.rev_iter(key_range),
1068            iter.into_stream(to_owned_item),
1069            self.table_id,
1070            self.epoch(),
1071            true,
1072        ))))
1073    }
1074
1075    fn insert(
1076        &mut self,
1077        key: TableKey<Bytes>,
1078        new_val: Bytes,
1079        old_val: Option<Bytes>,
1080    ) -> StorageResult<()> {
1081        match old_val {
1082            None => self.mem_table.insert(key, new_val)?,
1083            Some(old_val) => self.mem_table.update(key, old_val, new_val)?,
1084        };
1085        Ok(())
1086    }
1087
1088    fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
1089        Ok(self.mem_table.delete(key, old_val)?)
1090    }
1091
1092    async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
1093        if self.vnodes.len() > 1 {
1094            TableState::wait_epoch(
1095                &self.inner.tables,
1096                self.table_id,
1097                self.epoch.expect("should have init").prev,
1098            )
1099            .await;
1100        }
1101        Ok(std::mem::replace(&mut self.vnodes, vnodes))
1102    }
1103
1104    fn get_table_watermark(&self, _vnode: VirtualNode) -> Option<Bytes> {
1105        // TODO: may store the written table watermark and have a correct implementation
1106        None
1107    }
1108
1109    fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
1110        self.inner.new_read_snapshot_impl(MAX_EPOCH, self.table_id)
1111    }
1112}
1113
1114impl<R: RangeKv> StateStoreWriteEpochControl for RangeKvLocalStateStore<R> {
1115    async fn flush(&mut self) -> StorageResult<usize> {
1116        let buffer = self.mem_table.drain().into_parts();
1117        let mut kv_pairs = Vec::with_capacity(buffer.len());
1118        let sanity_check_read_snapshot = if sanity_check_enabled() {
1119            Some(self.inner.new_read_snapshot_impl(MAX_EPOCH, self.table_id))
1120        } else {
1121            None
1122        };
1123        for (key, key_op) in buffer {
1124            match key_op {
1125                // Currently, some executors do not strictly comply with these semantics. As
1126                // a workaround you may call disable the check by initializing the
1127                // state store with `op_consistency_level=Inconsistent`.
1128                KeyOp::Insert(value) => {
1129                    if let Some(sanity_check_read_snapshot) = &sanity_check_read_snapshot {
1130                        do_insert_sanity_check(
1131                            self.table_id,
1132                            &key,
1133                            &value,
1134                            sanity_check_read_snapshot,
1135                            &self.op_consistency_level,
1136                        )
1137                        .await?;
1138                    }
1139                    kv_pairs.push((key, StorageValue::new_put(value)));
1140                }
1141                KeyOp::Delete(old_value) => {
1142                    if let Some(sanity_check_read_snapshot) = &sanity_check_read_snapshot {
1143                        do_delete_sanity_check(
1144                            self.table_id,
1145                            &key,
1146                            &old_value,
1147                            sanity_check_read_snapshot,
1148                            &self.op_consistency_level,
1149                        )
1150                        .await?;
1151                    }
1152                    kv_pairs.push((key, StorageValue::new_delete()));
1153                }
1154                KeyOp::Update((old_value, new_value)) => {
1155                    if let Some(sanity_check_read_snapshot) = &sanity_check_read_snapshot {
1156                        do_update_sanity_check(
1157                            self.table_id,
1158                            &key,
1159                            &old_value,
1160                            &new_value,
1161                            sanity_check_read_snapshot,
1162                            &self.op_consistency_level,
1163                        )
1164                        .await?;
1165                    }
1166                    kv_pairs.push((key, StorageValue::new_put(new_value)));
1167                }
1168            }
1169        }
1170        let epoch = self.epoch();
1171        self.inner
1172            .ingest_vectors(self.table_id, epoch, take(&mut self.vectors));
1173        self.inner
1174            .ingest_batch(kv_pairs, vec![], epoch, self.table_id)
1175    }
1176
1177    async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
1178        assert_eq!(
1179            self.epoch.replace(options.epoch),
1180            None,
1181            "epoch in local state store of table id {:?} is init for more than once",
1182            self.table_id
1183        );
1184        self.inner
1185            .tables
1186            .lock()
1187            .entry(self.table_id)
1188            .or_insert_with(|| TableState::new(options.epoch.prev))
1189            .next_epochs
1190            .insert(options.epoch.prev, options.epoch.curr);
1191        if self.vnodes.len() > 1 {
1192            TableState::wait_epoch(&self.inner.tables, self.table_id, options.epoch.prev).await;
1193        }
1194
1195        Ok(())
1196    }
1197
1198    fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
1199        assert!(!self.mem_table.is_dirty());
1200        if let Some(value_checker) = opts.switch_op_consistency_level {
1201            self.mem_table.op_consistency_level.update(&value_checker);
1202        }
1203        let epoch = self
1204            .epoch
1205            .as_mut()
1206            .expect("should have init epoch before seal the first epoch");
1207        let prev_epoch = epoch.curr;
1208        epoch.prev = prev_epoch;
1209        epoch.curr = next_epoch;
1210        assert!(
1211            next_epoch > prev_epoch,
1212            "new epoch {} should be greater than current epoch: {}",
1213            next_epoch,
1214            prev_epoch
1215        );
1216
1217        let mut tables = self.inner.tables.lock();
1218        let table_state = tables
1219            .get_mut(&self.table_id)
1220            .expect("should be set when init");
1221
1222        table_state.next_epochs.insert(prev_epoch, next_epoch);
1223        if self.vnodes.len() > 1 {
1224            let sealing_epoch_vnodes = table_state
1225                .sealing_epochs
1226                .entry(prev_epoch)
1227                .or_insert_with(|| BitmapBuilder::zeroed(self.vnodes.len()));
1228            assert_eq!(self.vnodes.len(), sealing_epoch_vnodes.len());
1229            for vnode in self.vnodes.iter_ones() {
1230                assert!(!sealing_epoch_vnodes.is_set(vnode));
1231                sealing_epoch_vnodes.set(vnode, true);
1232            }
1233            if (0..self.vnodes.len()).all(|vnode| sealing_epoch_vnodes.is_set(vnode)) {
1234                let (all_sealed_epoch, _) =
1235                    table_state.sealing_epochs.pop_first().expect("non-empty");
1236                assert_eq!(
1237                    all_sealed_epoch, prev_epoch,
1238                    "new all_sealed_epoch must be the current prev epoch"
1239                );
1240                if let Some(prev_latest_sealed_epoch) =
1241                    table_state.latest_sealed_epoch.replace(prev_epoch)
1242                {
1243                    assert!(prev_epoch > prev_latest_sealed_epoch);
1244                }
1245            }
1246        }
1247
1248        if let Some((direction, watermarks, _watermark_type)) = opts.table_watermarks {
1249            let delete_ranges = watermarks
1250                .iter()
1251                .flat_map(|vnode_watermark| {
1252                    let inner_range = match direction {
1253                        WatermarkDirection::Ascending => {
1254                            (Unbounded, Excluded(vnode_watermark.watermark().clone()))
1255                        }
1256                        WatermarkDirection::Descending => {
1257                            (Excluded(vnode_watermark.watermark().clone()), Unbounded)
1258                        }
1259                    };
1260                    vnode_watermark
1261                        .vnode_bitmap()
1262                        .iter_vnodes()
1263                        .map(move |vnode| {
1264                            let (start, end) =
1265                                prefixed_range_with_vnode(inner_range.clone(), vnode);
1266                            (start.map(|key| key.0), end.map(|key| key.0))
1267                        })
1268                })
1269                .collect_vec();
1270            if let Err(e) =
1271                self.inner
1272                    .ingest_batch(Vec::new(), delete_ranges, self.epoch(), self.table_id)
1273            {
1274                error!(error = %e.as_report(), "failed to write delete ranges of table watermark");
1275            }
1276        }
1277    }
1278
1279    async fn try_flush(&mut self) -> StorageResult<()> {
1280        Ok(())
1281    }
1282}
1283
1284impl<R: RangeKv> StateStoreWriteVector for RangeKvLocalStateStore<R> {
1285    fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> StorageResult<()> {
1286        self.vectors.push((vec.to_owned_scalar(), info));
1287        Ok(())
1288    }
1289}
1290
1291pub struct RangeKvStateStoreIter<R: RangeKv> {
1292    inner: batched_iter::Iter<R>,
1293
1294    epoch: HummockEpoch,
1295    is_inclusive_epoch: bool,
1296
1297    last_key: Option<UserKey<Bytes>>,
1298
1299    item_buffer: Option<StateStoreKeyedRow>,
1300}
1301
1302impl<R: RangeKv> RangeKvStateStoreIter<R> {
1303    pub fn new(
1304        inner: batched_iter::Iter<R>,
1305        epoch: HummockEpoch,
1306        is_inclusive_epoch: bool,
1307    ) -> Self {
1308        Self {
1309            inner,
1310            epoch,
1311            is_inclusive_epoch,
1312            last_key: None,
1313            item_buffer: None,
1314        }
1315    }
1316}
1317
1318impl<R: RangeKv> StateStoreIter for RangeKvStateStoreIter<R> {
1319    async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
1320        self.next_inner()?;
1321        Ok(self
1322            .item_buffer
1323            .as_ref()
1324            .map(|(key, value)| (key.to_ref(), value.as_ref())))
1325    }
1326}
1327
1328impl<R: RangeKv> RangeKvStateStoreIter<R> {
1329    fn next_inner(&mut self) -> StorageResult<()> {
1330        self.item_buffer = None;
1331        while let Some((key, value)) = self.inner.next()? {
1332            let epoch = key.epoch_with_gap.pure_epoch();
1333            if epoch > self.epoch {
1334                continue;
1335            }
1336            if epoch == self.epoch && !self.is_inclusive_epoch {
1337                continue;
1338            }
1339            if Some(key.user_key.as_ref()) != self.last_key.as_ref().map(|key| key.as_ref()) {
1340                self.last_key = Some(key.user_key.clone());
1341                if let Some(value) = value {
1342                    self.item_buffer = Some((key, value));
1343                    break;
1344                }
1345            }
1346        }
1347        Ok(())
1348    }
1349}
1350
1351pub struct RangeKvStateStoreRevIter<R: RangeKv> {
1352    inner: batched_iter::Iter<R>,
1353
1354    epoch: HummockEpoch,
1355    is_inclusive_epoch: bool,
1356
1357    item_buffer: VecDeque<StateStoreKeyedRow>,
1358}
1359
1360impl<R: RangeKv> RangeKvStateStoreRevIter<R> {
1361    pub fn new(
1362        inner: batched_iter::Iter<R>,
1363        epoch: HummockEpoch,
1364        is_inclusive_epoch: bool,
1365    ) -> Self {
1366        Self {
1367            inner,
1368            epoch,
1369            is_inclusive_epoch,
1370            item_buffer: VecDeque::default(),
1371        }
1372    }
1373}
1374
1375impl<R: RangeKv> StateStoreIter for RangeKvStateStoreRevIter<R> {
1376    async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
1377        self.next_inner()?;
1378        Ok(self
1379            .item_buffer
1380            .back()
1381            .map(|(key, value)| (key.to_ref(), value.as_ref())))
1382    }
1383}
1384
1385impl<R: RangeKv> RangeKvStateStoreRevIter<R> {
1386    fn next_inner(&mut self) -> StorageResult<()> {
1387        self.item_buffer.pop_back();
1388        while let Some((key, value)) = self.inner.next()? {
1389            let epoch = key.epoch_with_gap.pure_epoch();
1390            if epoch > self.epoch {
1391                continue;
1392            }
1393            if epoch == self.epoch && !self.is_inclusive_epoch {
1394                continue;
1395            }
1396
1397            let v = match value {
1398                Some(v) => v,
1399                None => {
1400                    if let Some(last_key) = self.item_buffer.front()
1401                        && key.user_key.as_ref() == last_key.0.user_key.as_ref()
1402                    {
1403                        self.item_buffer.clear();
1404                    }
1405                    continue;
1406                }
1407            };
1408
1409            if let Some(last_key) = self.item_buffer.front() {
1410                if key.user_key.as_ref() != last_key.0.user_key.as_ref() {
1411                    self.item_buffer.push_front((key, v));
1412                    break;
1413                } else {
1414                    self.item_buffer.pop_front();
1415                    self.item_buffer.push_front((key, v));
1416                }
1417            } else {
1418                self.item_buffer.push_front((key, v));
1419            }
1420        }
1421        Ok(())
1422    }
1423}
1424
1425pub struct RangeKvStateStoreChangeLogIter<R: RangeKv> {
1426    new_value_iter: RangeKvStateStoreIter<R>,
1427    old_value_iter: RangeKvStateStoreIter<R>,
1428    item_buffer: Option<(TableKey<Bytes>, ChangeLogValue<Bytes>)>,
1429}
1430
1431impl<R: RangeKv> RangeKvStateStoreChangeLogIter<R> {
1432    fn new(
1433        mut new_value_iter: RangeKvStateStoreIter<R>,
1434        mut old_value_iter: RangeKvStateStoreIter<R>,
1435    ) -> StorageResult<Self> {
1436        new_value_iter.next_inner()?;
1437        old_value_iter.next_inner()?;
1438        Ok(Self {
1439            new_value_iter,
1440            old_value_iter,
1441            item_buffer: None,
1442        })
1443    }
1444}
1445
1446impl<R: RangeKv> StateStoreIter<StateStoreReadLogItem> for RangeKvStateStoreChangeLogIter<R> {
1447    async fn try_next(&mut self) -> StorageResult<Option<StateStoreReadLogItemRef<'_>>> {
1448        loop {
1449            match (
1450                &self.new_value_iter.item_buffer,
1451                &self.old_value_iter.item_buffer,
1452            ) {
1453                (None, None) => {
1454                    self.item_buffer = None;
1455                    break;
1456                }
1457                (Some((key, new_value)), None) => {
1458                    self.item_buffer = Some((
1459                        key.user_key.table_key.clone(),
1460                        ChangeLogValue::Insert(new_value.clone()),
1461                    ));
1462                    self.new_value_iter.next_inner()?;
1463                }
1464                (None, Some((key, old_value))) => {
1465                    self.item_buffer = Some((
1466                        key.user_key.table_key.clone(),
1467                        ChangeLogValue::Delete(old_value.clone()),
1468                    ));
1469                    self.old_value_iter.next_inner()?;
1470                }
1471                (Some((new_value_key, new_value)), Some((old_value_key, old_value))) => {
1472                    match new_value_key.user_key.cmp(&old_value_key.user_key) {
1473                        Ordering::Less => {
1474                            self.item_buffer = Some((
1475                                new_value_key.user_key.table_key.clone(),
1476                                ChangeLogValue::Insert(new_value.clone()),
1477                            ));
1478                            self.new_value_iter.next_inner()?;
1479                        }
1480                        Ordering::Greater => {
1481                            self.item_buffer = Some((
1482                                old_value_key.user_key.table_key.clone(),
1483                                ChangeLogValue::Delete(old_value.clone()),
1484                            ));
1485                            self.old_value_iter.next_inner()?;
1486                        }
1487                        Ordering::Equal => {
1488                            if new_value == old_value {
1489                                self.new_value_iter.next_inner()?;
1490                                self.old_value_iter.next_inner()?;
1491                                continue;
1492                            }
1493                            self.item_buffer = Some((
1494                                new_value_key.user_key.table_key.clone(),
1495                                ChangeLogValue::Update {
1496                                    new_value: new_value.clone(),
1497                                    old_value: old_value.clone(),
1498                                },
1499                            ));
1500                            self.new_value_iter.next_inner()?;
1501                            self.old_value_iter.next_inner()?;
1502                        }
1503                    }
1504                }
1505            };
1506            break;
1507        }
1508        Ok(self
1509            .item_buffer
1510            .as_ref()
1511            .map(|(key, value)| (key.to_ref(), value.to_ref())))
1512    }
1513}
1514
1515#[cfg(test)]
1516mod tests {
1517    use risingwave_common::util::epoch::test_epoch;
1518
1519    use super::*;
1520    use crate::hummock::iterator::test_utils::{
1521        iterator_test_table_key_of, iterator_test_value_of,
1522    };
1523    use crate::hummock::test_utils::{ReadOptions, *};
1524    use crate::memory::sled::SledStateStore;
1525
1526    #[tokio::test]
1527    async fn test_snapshot_isolation_memory() {
1528        let state_store = MemoryStateStore::new();
1529        test_snapshot_isolation_inner(state_store).await;
1530    }
1531
1532    #[cfg(not(madsim))]
1533    #[tokio::test]
1534    async fn test_snapshot_isolation_sled() {
1535        let state_store = SledStateStore::new_temp();
1536        test_snapshot_isolation_inner(state_store).await;
1537    }
1538
1539    async fn test_snapshot_isolation_inner(state_store: RangeKvStateStore<impl RangeKv>) {
1540        state_store
1541            .ingest_batch(
1542                vec![
1543                    (
1544                        TableKey(Bytes::from(b"a".to_vec())),
1545                        StorageValue::new_put(b"v1".to_vec()),
1546                    ),
1547                    (
1548                        TableKey(Bytes::from(b"b".to_vec())),
1549                        StorageValue::new_put(b"v1".to_vec()),
1550                    ),
1551                ],
1552                vec![],
1553                0,
1554                Default::default(),
1555            )
1556            .unwrap();
1557        state_store
1558            .ingest_batch(
1559                vec![
1560                    (
1561                        TableKey(Bytes::from(b"a".to_vec())),
1562                        StorageValue::new_put(b"v2".to_vec()),
1563                    ),
1564                    (
1565                        TableKey(Bytes::from(b"b".to_vec())),
1566                        StorageValue::new_delete(),
1567                    ),
1568                ],
1569                vec![],
1570                test_epoch(1),
1571                Default::default(),
1572            )
1573            .unwrap();
1574        assert_eq!(
1575            state_store
1576                .scan(
1577                    (
1578                        Bound::Included(TableKey(Bytes::from("a"))),
1579                        Bound::Included(TableKey(Bytes::from("b"))),
1580                    ),
1581                    0,
1582                    TableId::default(),
1583                    None,
1584                )
1585                .unwrap(),
1586            vec![
1587                (
1588                    FullKey::for_test(Default::default(), Bytes::from("a"), 0)
1589                        .encode()
1590                        .into(),
1591                    b"v1".to_vec().into()
1592                ),
1593                (
1594                    FullKey::for_test(Default::default(), Bytes::from("b"), 0)
1595                        .encode()
1596                        .into(),
1597                    b"v1".to_vec().into()
1598                )
1599            ]
1600        );
1601        assert_eq!(
1602            state_store
1603                .scan(
1604                    (
1605                        Bound::Included(TableKey(Bytes::from("a"))),
1606                        Bound::Included(TableKey(Bytes::from("b"))),
1607                    ),
1608                    0,
1609                    TableId::default(),
1610                    Some(1),
1611                )
1612                .unwrap(),
1613            vec![(
1614                FullKey::for_test(Default::default(), b"a".to_vec(), 0)
1615                    .encode()
1616                    .into(),
1617                b"v1".to_vec().into()
1618            )]
1619        );
1620        assert_eq!(
1621            state_store
1622                .scan(
1623                    (
1624                        Bound::Included(TableKey(Bytes::from("a"))),
1625                        Bound::Included(TableKey(Bytes::from("b"))),
1626                    ),
1627                    test_epoch(1),
1628                    TableId::default(),
1629                    None,
1630                )
1631                .unwrap(),
1632            vec![(
1633                FullKey::for_test(Default::default(), b"a".to_vec(), test_epoch(1))
1634                    .encode()
1635                    .into(),
1636                b"v2".to_vec().into()
1637            )]
1638        );
1639        assert_eq!(
1640            state_store
1641                .get(TableKey(Bytes::from("a")), 0, ReadOptions::default())
1642                .await
1643                .unwrap(),
1644            Some(Bytes::from("v1"))
1645        );
1646        assert_eq!(
1647            state_store
1648                .get(
1649                    TableKey(Bytes::copy_from_slice(b"b")),
1650                    0,
1651                    ReadOptions::default(),
1652                )
1653                .await
1654                .unwrap(),
1655            Some(b"v1".to_vec().into())
1656        );
1657        assert_eq!(
1658            state_store
1659                .get(
1660                    TableKey(Bytes::copy_from_slice(b"c")),
1661                    0,
1662                    ReadOptions::default(),
1663                )
1664                .await
1665                .unwrap(),
1666            None
1667        );
1668        assert_eq!(
1669            state_store
1670                .get(
1671                    TableKey(Bytes::copy_from_slice(b"a")),
1672                    test_epoch(1),
1673                    ReadOptions::default(),
1674                )
1675                .await
1676                .unwrap(),
1677            Some(b"v2".to_vec().into())
1678        );
1679        assert_eq!(
1680            state_store
1681                .get(
1682                    TableKey(Bytes::from("b")),
1683                    test_epoch(1),
1684                    ReadOptions::default(),
1685                )
1686                .await
1687                .unwrap(),
1688            None
1689        );
1690        assert_eq!(
1691            state_store
1692                .get(
1693                    TableKey(Bytes::from("c")),
1694                    test_epoch(1),
1695                    ReadOptions::default()
1696                )
1697                .await
1698                .unwrap(),
1699            None
1700        );
1701    }
1702
1703    #[tokio::test]
1704    async fn test_iter_log_memory() {
1705        let state_store = MemoryStateStore::new();
1706        test_iter_log_inner(state_store).await;
1707    }
1708
1709    #[cfg(not(madsim))]
1710    #[tokio::test]
1711    async fn test_iter_log_sled() {
1712        let state_store = SledStateStore::new_temp();
1713        test_iter_log_inner(state_store).await;
1714    }
1715
1716    async fn test_iter_log_inner(state_store: RangeKvStateStore<impl RangeKv>) {
1717        let table_id = TableId::new(233);
1718        let epoch1 = test_epoch(1);
1719        let key_idx = [1, 2, 4];
1720        let make_key = |i| TableKey(Bytes::from(iterator_test_table_key_of(i)));
1721        let make_value = |i| Bytes::from(iterator_test_value_of(i));
1722        state_store
1723            .ingest_batch(
1724                key_idx
1725                    .iter()
1726                    .map(|i| (make_key(*i), StorageValue::new_put(make_value(*i))))
1727                    .collect(),
1728                vec![],
1729                epoch1,
1730                table_id,
1731            )
1732            .unwrap();
1733        {
1734            let mut iter = state_store
1735                .iter_log(
1736                    (epoch1, epoch1),
1737                    (Unbounded, Unbounded),
1738                    ReadLogOptions { table_id },
1739                )
1740                .await
1741                .unwrap();
1742            for i in key_idx {
1743                let (iter_key, change_value) = iter.try_next().await.unwrap().unwrap();
1744                assert_eq!(make_key(i).to_ref(), iter_key);
1745                assert_eq!(change_value, ChangeLogValue::Insert(make_value(i).as_ref()));
1746            }
1747            assert!(iter.try_next().await.unwrap().is_none());
1748        }
1749
1750        let epoch2 = test_epoch(2);
1751        state_store
1752            .ingest_batch(
1753                vec![
1754                    (make_key(1), StorageValue::new_put(make_value(12))), // update
1755                    (make_key(2), StorageValue::new_delete()),            // delete
1756                    (make_key(3), StorageValue::new_put(make_value(3))),
1757                ],
1758                vec![],
1759                epoch2,
1760                table_id,
1761            )
1762            .unwrap();
1763
1764        // check iter log between two epoch
1765        {
1766            let expected = vec![
1767                (
1768                    make_key(1),
1769                    ChangeLogValue::Update {
1770                        new_value: make_value(12),
1771                        old_value: make_value(1),
1772                    },
1773                ),
1774                (make_key(2), ChangeLogValue::Delete(make_value(2))),
1775                (make_key(3), ChangeLogValue::Insert(make_value(3))),
1776            ];
1777            let mut iter = state_store
1778                .iter_log(
1779                    (epoch2, epoch2),
1780                    (Unbounded, Unbounded),
1781                    ReadLogOptions { table_id },
1782                )
1783                .await
1784                .unwrap();
1785            for (key, change_log_value) in expected {
1786                let (iter_key, iter_value) = iter.try_next().await.unwrap().unwrap();
1787                assert_eq!(
1788                    key.to_ref(),
1789                    iter_key,
1790                    "{:?} {:?}",
1791                    change_log_value.to_ref(),
1792                    iter_value
1793                );
1794                assert_eq!(change_log_value.to_ref(), iter_value);
1795            }
1796            assert!(iter.try_next().await.unwrap().is_none());
1797        }
1798        // check iter log on the original old epoch
1799        {
1800            let mut iter = state_store
1801                .iter_log(
1802                    (epoch1, epoch1),
1803                    (Unbounded, Unbounded),
1804                    ReadLogOptions { table_id },
1805                )
1806                .await
1807                .unwrap();
1808            for i in key_idx {
1809                let (iter_key, change_value) = iter.try_next().await.unwrap().unwrap();
1810                assert_eq!(make_key(i).to_ref(), iter_key);
1811                assert_eq!(change_value, ChangeLogValue::Insert(make_value(i).as_ref()));
1812            }
1813            assert!(iter.try_next().await.unwrap().is_none());
1814        }
1815        // check iter on merging the two epochs
1816        {
1817            let mut iter = state_store
1818                .iter_log(
1819                    (epoch1, epoch2),
1820                    (Unbounded, Unbounded),
1821                    ReadLogOptions { table_id },
1822                )
1823                .await
1824                .unwrap();
1825            let (iter_key, change_value) = iter.try_next().await.unwrap().unwrap();
1826            assert_eq!(make_key(1).to_ref(), iter_key);
1827            assert_eq!(
1828                change_value,
1829                ChangeLogValue::Insert(make_value(12).as_ref())
1830            );
1831            for i in [3, 4] {
1832                let (iter_key, change_value) = iter.try_next().await.unwrap().unwrap();
1833                assert_eq!(make_key(i).to_ref(), iter_key);
1834                assert_eq!(change_value, ChangeLogValue::Insert(make_value(i).as_ref()));
1835            }
1836            assert!(iter.try_next().await.unwrap().is_none());
1837        }
1838    }
1839}