risingwave_storage/
memory.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
17use std::mem::take;
18use std::ops::Bound::{Excluded, Included, Unbounded};
19use std::ops::{Bound, RangeBounds};
20use std::sync::{Arc, LazyLock};
21
22use bytes::Bytes;
23use itertools::Itertools;
24use parking_lot::RwLock;
25use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
26use risingwave_common::catalog::{TableId, TableOption};
27use risingwave_common::dispatch_distance_measurement;
28use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
29use risingwave_common::util::epoch::{EpochPair, MAX_EPOCH};
30use risingwave_hummock_sdk::key::{
31    FullKey, TableKey, TableKeyRange, UserKey, prefixed_range_with_vnode,
32};
33use risingwave_hummock_sdk::table_watermark::WatermarkDirection;
34use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch};
35use thiserror_ext::AsReport;
36use tokio::task::yield_now;
37use tracing::error;
38
39use crate::error::StorageResult;
40use crate::hummock::HummockError;
41use crate::hummock::utils::{
42    do_delete_sanity_check, do_insert_sanity_check, do_update_sanity_check, merge_stream,
43    sanity_check_enabled,
44};
45use crate::mem_table::{KeyOp, MemTable};
46use crate::storage_value::StorageValue;
47use crate::store::*;
48use crate::vector::{MeasureDistanceBuilder, NearestBuilder};
49
50pub type BytesFullKey = FullKey<Bytes>;
51pub type BytesFullKeyRange = (Bound<BytesFullKey>, Bound<BytesFullKey>);
52
53#[allow(clippy::type_complexity)]
54pub trait RangeKv: Clone + Send + Sync + 'static {
55    fn range(
56        &self,
57        range: BytesFullKeyRange,
58        limit: Option<usize>,
59    ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>>;
60
61    fn rev_range(
62        &self,
63        range: BytesFullKeyRange,
64        limit: Option<usize>,
65    ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>>;
66
67    fn ingest_batch(
68        &self,
69        kv_pairs: impl Iterator<Item = (BytesFullKey, Option<Bytes>)>,
70    ) -> StorageResult<()>;
71
72    fn flush(&self) -> StorageResult<()>;
73}
74
75pub type BTreeMapRangeKv = Arc<RwLock<BTreeMap<BytesFullKey, Option<Bytes>>>>;
76
77impl RangeKv for BTreeMapRangeKv {
78    fn range(
79        &self,
80        range: BytesFullKeyRange,
81        limit: Option<usize>,
82    ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>> {
83        let limit = limit.unwrap_or(usize::MAX);
84        Ok(self
85            .read()
86            .range(range)
87            .take(limit)
88            .map(|(key, value)| (key.clone(), value.clone()))
89            .collect())
90    }
91
92    fn rev_range(
93        &self,
94        range: BytesFullKeyRange,
95        limit: Option<usize>,
96    ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>> {
97        let limit = limit.unwrap_or(usize::MAX);
98        Ok(self
99            .read()
100            .range(range)
101            .rev()
102            .take(limit)
103            .map(|(key, value)| (key.clone(), value.clone()))
104            .collect())
105    }
106
107    fn ingest_batch(
108        &self,
109        kv_pairs: impl Iterator<Item = (BytesFullKey, Option<Bytes>)>,
110    ) -> StorageResult<()> {
111        let mut inner = self.write();
112        for (key, value) in kv_pairs {
113            inner.insert(key, value);
114        }
115        Ok(())
116    }
117
118    fn flush(&self) -> StorageResult<()> {
119        Ok(())
120    }
121}
122
123pub mod sled {
124    use std::fs::create_dir_all;
125    use std::ops::RangeBounds;
126
127    use bytes::Bytes;
128    use risingwave_hummock_sdk::key::FullKey;
129
130    use crate::error::StorageResult;
131    use crate::memory::{BytesFullKey, BytesFullKeyRange, RangeKv, RangeKvStateStore};
132
133    #[derive(Clone)]
134    pub struct SledRangeKv {
135        inner: sled::Db,
136    }
137
138    impl SledRangeKv {
139        pub fn new(path: impl AsRef<std::path::Path>) -> Self {
140            SledRangeKv {
141                inner: sled::open(path).expect("open"),
142            }
143        }
144
145        pub fn new_temp() -> Self {
146            create_dir_all("./.risingwave/sled").expect("should create");
147            let path = tempfile::TempDir::new_in("./.risingwave/sled")
148                .expect("find temp dir")
149                .keep();
150            Self::new(path)
151        }
152    }
153
154    const EMPTY: u8 = 1;
155    const NON_EMPTY: u8 = 0;
156
157    impl RangeKv for SledRangeKv {
158        fn range(
159            &self,
160            range: BytesFullKeyRange,
161            limit: Option<usize>,
162        ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>> {
163            let (left, right) = range;
164            let full_key_ref_bound = (
165                left.as_ref().map(FullKey::to_ref),
166                right.as_ref().map(FullKey::to_ref),
167            );
168            let left_encoded = left.as_ref().map(|key| key.to_ref().encode_reverse_epoch());
169            let right_encoded = right
170                .as_ref()
171                .map(|key| key.to_ref().encode_reverse_epoch());
172            let limit = limit.unwrap_or(usize::MAX);
173            let mut ret = vec![];
174            for result in self.inner.range((left_encoded, right_encoded)).take(limit) {
175                let (key, value) = result?;
176                let full_key = FullKey::decode_reverse_epoch(key.as_ref()).copy_into();
177                if !full_key_ref_bound.contains(&full_key.to_ref()) {
178                    continue;
179                }
180                let value = match value.as_ref() {
181                    [EMPTY] => None,
182                    [NON_EMPTY, rest @ ..] => Some(Bytes::from(Vec::from(rest))),
183                    _ => unreachable!("malformed value: {:?}", value),
184                };
185                ret.push((full_key, value))
186            }
187            Ok(ret)
188        }
189
190        fn rev_range(
191            &self,
192            range: BytesFullKeyRange,
193            limit: Option<usize>,
194        ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>> {
195            let (left, right) = range;
196            let full_key_ref_bound = (
197                left.as_ref().map(FullKey::to_ref),
198                right.as_ref().map(FullKey::to_ref),
199            );
200            let left_encoded = left.as_ref().map(|key| key.to_ref().encode_reverse_epoch());
201            let right_encoded = right
202                .as_ref()
203                .map(|key| key.to_ref().encode_reverse_epoch());
204            let limit = limit.unwrap_or(usize::MAX);
205            let mut ret = vec![];
206            for result in self
207                .inner
208                .range((left_encoded, right_encoded))
209                .rev()
210                .take(limit)
211            {
212                let (key, value) = result?;
213                let full_key = FullKey::decode_reverse_epoch(key.as_ref()).copy_into();
214                if !full_key_ref_bound.contains(&full_key.to_ref()) {
215                    continue;
216                }
217                let value = match value.as_ref() {
218                    [EMPTY] => None,
219                    [NON_EMPTY, rest @ ..] => Some(Bytes::from(Vec::from(rest))),
220                    _ => unreachable!("malformed value: {:?}", value),
221                };
222                ret.push((full_key, value))
223            }
224            Ok(ret)
225        }
226
227        fn ingest_batch(
228            &self,
229            kv_pairs: impl Iterator<Item = (BytesFullKey, Option<Bytes>)>,
230        ) -> StorageResult<()> {
231            let mut batch = sled::Batch::default();
232            for (key, value) in kv_pairs {
233                let encoded_key = key.encode_reverse_epoch();
234                let key = sled::IVec::from(encoded_key);
235                let mut buffer =
236                    Vec::with_capacity(value.as_ref().map(|v| v.len()).unwrap_or_default() + 1);
237                if let Some(value) = value {
238                    buffer.push(NON_EMPTY);
239                    buffer.extend_from_slice(value.as_ref());
240                } else {
241                    buffer.push(EMPTY);
242                }
243                let value = sled::IVec::from(buffer);
244                batch.insert(key, value);
245            }
246            self.inner.apply_batch(batch)?;
247            Ok(())
248        }
249
250        fn flush(&self) -> StorageResult<()> {
251            Ok(self.inner.flush().map(|_| {})?)
252        }
253    }
254
255    pub type SledStateStore = RangeKvStateStore<SledRangeKv>;
256
257    impl SledStateStore {
258        pub fn new(path: impl AsRef<std::path::Path>) -> Self {
259            RangeKvStateStore {
260                inner: SledRangeKv::new(path),
261                tables: Default::default(),
262                vectors: Default::default(),
263            }
264        }
265
266        pub fn new_temp() -> Self {
267            RangeKvStateStore {
268                inner: SledRangeKv::new_temp(),
269                tables: Default::default(),
270                vectors: Default::default(),
271            }
272        }
273    }
274
275    #[cfg(test)]
276    mod test {
277        use std::ops::{Bound, RangeBounds};
278
279        use bytes::Bytes;
280        use risingwave_common::catalog::TableId;
281        use risingwave_common::util::epoch::EPOCH_SPILL_TIME_MASK;
282        use risingwave_hummock_sdk::EpochWithGap;
283        use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey};
284
285        use crate::memory::RangeKv;
286        use crate::memory::sled::SledRangeKv;
287
288        #[test]
289        fn test_filter_variable_key_length_false_positive() {
290            let table_id = TableId { table_id: 233 };
291            let epoch = u64::MAX - u64::from_be_bytes([1, 2, 3, 4, 5, 6, 7, 8]);
292            let excluded_short_table_key = [0, 1, 0, 0];
293            let included_long_table_key = [0, 1, 0, 0, 1, 2];
294            let left_table_key = [0, 1, 0, 0, 1];
295            let right_table_key = [0, 1, 1, 1];
296
297            let to_full_key = |table_key: &[u8]| FullKey {
298                user_key: UserKey {
299                    table_id,
300                    table_key: TableKey(Bytes::from(table_key.to_vec())),
301                },
302                epoch_with_gap: EpochWithGap::new_from_epoch(epoch & !EPOCH_SPILL_TIME_MASK),
303            };
304
305            let left_full_key = to_full_key(&left_table_key[..]);
306            let right_full_key = to_full_key(&right_table_key[..]);
307            let included_long_full_key = to_full_key(&included_long_table_key[..]);
308            let excluded_short_full_key = to_full_key(&excluded_short_table_key[..]);
309
310            assert!(
311                (
312                    Bound::Included(left_full_key.to_ref()),
313                    Bound::Included(right_full_key.to_ref())
314                )
315                    .contains(&included_long_full_key.to_ref())
316            );
317            assert!(
318                !(
319                    Bound::Included(left_full_key.to_ref()),
320                    Bound::Included(right_full_key.to_ref())
321                )
322                    .contains(&excluded_short_full_key.to_ref())
323            );
324
325            let left_encoded = left_full_key.encode_reverse_epoch();
326            let right_encoded = right_full_key.encode_reverse_epoch();
327
328            assert!(
329                (
330                    Bound::Included(left_encoded.clone()),
331                    Bound::Included(right_encoded.clone())
332                )
333                    .contains(&included_long_full_key.encode_reverse_epoch())
334            );
335            assert!(
336                (
337                    Bound::Included(left_encoded),
338                    Bound::Included(right_encoded)
339                )
340                    .contains(&excluded_short_full_key.encode_reverse_epoch())
341            );
342
343            let sled_range_kv = SledRangeKv::new_temp();
344            sled_range_kv
345                .ingest_batch(
346                    vec![
347                        (included_long_full_key.clone(), None),
348                        (excluded_short_full_key, None),
349                    ]
350                    .into_iter(),
351                )
352                .unwrap();
353            let kvs = sled_range_kv
354                .range(
355                    (
356                        Bound::Included(left_full_key),
357                        Bound::Included(right_full_key),
358                    ),
359                    None,
360                )
361                .unwrap();
362            assert_eq!(1, kvs.len());
363            assert_eq!(included_long_full_key.to_ref(), kvs[0].0.to_ref());
364            assert!(kvs[0].1.is_none());
365        }
366    }
367}
368
369mod batched_iter {
370
371    use super::*;
372
373    /// A utility struct for iterating over a range of keys in a locked `BTreeMap`, which will batch
374    /// some records to make a trade-off between the copying overhead and the times of acquiring
375    /// the lock.
376    ///
377    /// Therefore, it's not guaranteed that we're iterating over a consistent snapshot of the map.
378    /// Users should handle MVCC by themselves.
379    pub struct Iter<R: RangeKv> {
380        inner: R,
381        range: BytesFullKeyRange,
382        current: std::vec::IntoIter<(FullKey<Bytes>, Option<Bytes>)>,
383        rev: bool,
384    }
385
386    impl<R: RangeKv> Iter<R> {
387        pub fn new(inner: R, range: BytesFullKeyRange, rev: bool) -> Self {
388            Self {
389                inner,
390                range,
391                rev,
392                current: Vec::new().into_iter(),
393            }
394        }
395    }
396
397    impl<R: RangeKv> Iter<R> {
398        const BATCH_SIZE: usize = 256;
399
400        /// Get the next batch of records and fill the `current` buffer.
401        fn refill(&mut self) -> StorageResult<()> {
402            assert!(self.current.is_empty());
403
404            let batch = if self.rev {
405                self.inner.rev_range(
406                    (self.range.0.clone(), self.range.1.clone()),
407                    Some(Self::BATCH_SIZE),
408                )?
409            } else {
410                self.inner.range(
411                    (self.range.0.clone(), self.range.1.clone()),
412                    Some(Self::BATCH_SIZE),
413                )?
414            };
415
416            if let Some((last_key, _)) = batch.last() {
417                let full_key = FullKey::new_with_gap_epoch(
418                    last_key.user_key.table_id,
419                    TableKey(last_key.user_key.table_key.0.clone()),
420                    last_key.epoch_with_gap,
421                );
422                if self.rev {
423                    self.range.1 = Bound::Excluded(full_key);
424                } else {
425                    self.range.0 = Bound::Excluded(full_key);
426                }
427            }
428            self.current = batch.into_iter();
429            Ok(())
430        }
431    }
432
433    impl<R: RangeKv> Iter<R> {
434        #[allow(clippy::type_complexity)]
435        pub fn next(&mut self) -> StorageResult<Option<(BytesFullKey, Option<Bytes>)>> {
436            match self.current.next() {
437                Some((key, value)) => Ok(Some((key, value))),
438                None => {
439                    self.refill()?;
440                    Ok(self.current.next())
441                }
442            }
443        }
444    }
445
446    #[cfg(test)]
447    mod tests {
448        use rand::Rng;
449
450        use super::*;
451        use crate::memory::sled::SledRangeKv;
452
453        #[test]
454        fn test_btreemap_iter_chaos() {
455            let map = Arc::new(RwLock::new(BTreeMap::new()));
456            test_iter_chaos_inner(map, 1000);
457        }
458
459        #[cfg(not(madsim))]
460        #[test]
461        fn test_sled_iter_chaos() {
462            let map = SledRangeKv::new_temp();
463            test_iter_chaos_inner(map, 100);
464        }
465
466        fn test_iter_chaos_inner(map: impl RangeKv, count: usize) {
467            let key_range = 1..=10000;
468            let num_to_bytes = |k: i32| Bytes::from(format!("{:06}", k).as_bytes().to_vec());
469            let num_to_full_key =
470                |k: i32| FullKey::new(TableId::default(), TableKey(num_to_bytes(k)), 0);
471            #[allow(clippy::mutable_key_type)]
472            map.ingest_batch(key_range.clone().map(|k| {
473                let key = num_to_full_key(k);
474                let b = key.user_key.table_key.0.clone();
475
476                (key, Some(b))
477            }))
478            .unwrap();
479
480            let rand_bound = || {
481                let key = rand::rng().random_range(key_range.clone());
482                let key = num_to_full_key(key);
483                match rand::rng().random_range(1..=5) {
484                    1 | 2 => Bound::Included(key),
485                    3 | 4 => Bound::Excluded(key),
486                    _ => Bound::Unbounded,
487                }
488            };
489
490            for _ in 0..count {
491                let range = loop {
492                    let range = (rand_bound(), rand_bound());
493                    let (start, end) = (range.start_bound(), range.end_bound());
494
495                    // Filter out invalid ranges. Code migrated from `BTreeMap::range`.
496                    match (start, end) {
497                        (Bound::Excluded(s), Bound::Excluded(e)) if s == e => {
498                            continue;
499                        }
500                        (
501                            Bound::Included(s) | Bound::Excluded(s),
502                            Bound::Included(e) | Bound::Excluded(e),
503                        ) if s > e => {
504                            continue;
505                        }
506                        _ => break range,
507                    }
508                };
509
510                let v1 = {
511                    let mut v = vec![];
512                    let mut iter = Iter::new(map.clone(), range.clone(), false);
513                    while let Some((key, value)) = iter.next().unwrap() {
514                        v.push((key, value));
515                    }
516                    v
517                };
518                let v2 = map.range(range, None).unwrap();
519
520                // Items iterated from the batched iterator should be the same as normaliterator.
521                assert_eq!(v1, v2);
522            }
523        }
524    }
525}
526
527pub type MemoryStateStore = RangeKvStateStore<BTreeMapRangeKv>;
528
529struct TableState {
530    init_epoch: u64,
531    next_epochs: BTreeMap<u64, u64>,
532    latest_sealed_epoch: Option<u64>,
533    sealing_epochs: BTreeMap<u64, BitmapBuilder>,
534}
535
536impl TableState {
537    fn new(init_epoch: u64) -> Self {
538        Self {
539            init_epoch,
540            next_epochs: Default::default(),
541            latest_sealed_epoch: None,
542            sealing_epochs: Default::default(),
543        }
544    }
545
546    async fn wait_epoch(
547        tables: &parking_lot::Mutex<HashMap<TableId, Self>>,
548        table_id: TableId,
549        epoch: u64,
550    ) {
551        loop {
552            {
553                let tables = tables.lock();
554                let table_state = tables.get(&table_id).expect("should exist");
555                assert!(epoch >= table_state.init_epoch);
556                if epoch == table_state.init_epoch {
557                    return;
558                }
559                if let Some(latest_sealed_epoch) = table_state.latest_sealed_epoch
560                    && latest_sealed_epoch >= epoch
561                {
562                    return;
563                }
564            }
565            yield_now().await;
566        }
567    }
568}
569
570type InMemVectorStore = Arc<RwLock<HashMap<TableId, Vec<(Vector, Bytes, u64)>>>>;
571
572/// An in-memory state store
573///
574/// The in-memory state store is a [`BTreeMap`], which maps [`FullKey`] to value. It
575/// never does GC, so the memory usage will be high. Therefore, in-memory state store should never
576/// be used in production.
577#[derive(Clone, Default)]
578pub struct RangeKvStateStore<R: RangeKv> {
579    /// Stores (key, epoch) -> user value.
580    inner: R,
581    /// `table_id` -> `prev_epoch` -> `curr_epoch`
582    tables: Arc<parking_lot::Mutex<HashMap<TableId, TableState>>>,
583
584    vectors: InMemVectorStore,
585}
586
587fn to_full_key_range<R, B>(table_id: TableId, table_key_range: R) -> BytesFullKeyRange
588where
589    R: RangeBounds<B> + Send,
590    B: AsRef<[u8]>,
591{
592    let start = match table_key_range.start_bound() {
593        Included(k) => Included(FullKey::new(
594            table_id,
595            TableKey(Bytes::from(k.as_ref().to_vec())),
596            HummockEpoch::MAX,
597        )),
598        Excluded(k) => Excluded(FullKey::new(
599            table_id,
600            TableKey(Bytes::from(k.as_ref().to_vec())),
601            0,
602        )),
603        Unbounded => Included(FullKey::new(
604            table_id,
605            TableKey(Bytes::from(b"".to_vec())),
606            HummockEpoch::MAX,
607        )),
608    };
609    let end = match table_key_range.end_bound() {
610        Included(k) => Included(FullKey::new(
611            table_id,
612            TableKey(Bytes::from(k.as_ref().to_vec())),
613            0,
614        )),
615        Excluded(k) => Excluded(FullKey::new(
616            table_id,
617            TableKey(Bytes::from(k.as_ref().to_vec())),
618            HummockEpoch::MAX,
619        )),
620        Unbounded => {
621            if let Some(next_table_id) = table_id.table_id().checked_add(1) {
622                Excluded(FullKey::new(
623                    next_table_id.into(),
624                    TableKey(Bytes::from(b"".to_vec())),
625                    HummockEpoch::MAX,
626                ))
627            } else {
628                Unbounded
629            }
630        }
631    };
632    (start, end)
633}
634
635impl MemoryStateStore {
636    pub fn new() -> Self {
637        Self::default()
638    }
639
640    pub fn shared() -> Self {
641        static STORE: LazyLock<MemoryStateStore> = LazyLock::new(MemoryStateStore::new);
642        STORE.clone()
643    }
644}
645
646impl<R: RangeKv> RangeKvStateStore<R> {
647    fn scan(
648        &self,
649        key_range: TableKeyRange,
650        epoch: u64,
651        table_id: TableId,
652        limit: Option<usize>,
653    ) -> StorageResult<Vec<(Bytes, Bytes)>> {
654        let mut data = vec![];
655        if limit == Some(0) {
656            return Ok(vec![]);
657        }
658        let mut last_user_key = None;
659        for (key, value) in self
660            .inner
661            .range(to_full_key_range(table_id, key_range), None)?
662        {
663            if key.epoch_with_gap.pure_epoch() > epoch {
664                continue;
665            }
666            if Some(&key.user_key) != last_user_key.as_ref() {
667                if let Some(value) = value {
668                    data.push((Bytes::from(key.encode()), value.clone()));
669                }
670                last_user_key = Some(key.user_key.clone());
671            }
672            if let Some(limit) = limit
673                && data.len() >= limit
674            {
675                break;
676            }
677        }
678        Ok(data)
679    }
680}
681
682#[derive(Clone)]
683pub struct RangeKvStateStoreReadSnapshot<R: RangeKv> {
684    inner: RangeKvStateStore<R>,
685    epoch: u64,
686    table_id: TableId,
687}
688
689impl<R: RangeKv> StateStoreGet for RangeKvStateStoreReadSnapshot<R> {
690    async fn on_key_value<O: Send + 'static>(
691        &self,
692        key: TableKey<Bytes>,
693        _read_options: ReadOptions,
694        on_key_value_fn: impl KeyValueFn<O>,
695    ) -> StorageResult<Option<O>> {
696        self.inner
697            .get_keyed_row_impl(key, self.epoch, self.table_id)
698            .and_then(|option| {
699                if let Some((key, value)) = option {
700                    on_key_value_fn(key.to_ref(), value.as_ref()).map(Some)
701                } else {
702                    Ok(None)
703                }
704            })
705    }
706}
707
708impl<R: RangeKv> StateStoreRead for RangeKvStateStoreReadSnapshot<R> {
709    type Iter = RangeKvStateStoreIter<R>;
710    type RevIter = RangeKvStateStoreRevIter<R>;
711
712    async fn iter(
713        &self,
714        key_range: TableKeyRange,
715        _read_options: ReadOptions,
716    ) -> StorageResult<Self::Iter> {
717        self.inner.iter_impl(key_range, self.epoch, self.table_id)
718    }
719
720    async fn rev_iter(
721        &self,
722        key_range: TableKeyRange,
723        _read_options: ReadOptions,
724    ) -> StorageResult<Self::RevIter> {
725        self.inner
726            .rev_iter_impl(key_range, self.epoch, self.table_id)
727    }
728}
729
730impl<R: RangeKv> StateStoreReadVector for RangeKvStateStoreReadSnapshot<R> {
731    async fn nearest<O: Send + 'static>(
732        &self,
733        vec: Vector,
734        options: VectorNearestOptions,
735        on_nearest_item_fn: impl OnNearestItemFn<O>,
736    ) -> StorageResult<Vec<O>> {
737        fn nearest_impl<M: MeasureDistanceBuilder, O>(
738            store: &InMemVectorStore,
739            epoch: u64,
740            table_id: TableId,
741            vec: Vector,
742            options: VectorNearestOptions,
743            on_nearest_item_fn: impl OnNearestItemFn<O>,
744        ) -> Vec<O> {
745            let mut builder = NearestBuilder::<'_, O, M>::new(vec.to_ref(), options.top_n);
746            builder.add(
747                store
748                    .read()
749                    .get(&table_id)
750                    .map(|vec| vec.iter())
751                    .into_iter()
752                    .flatten()
753                    .filter(|(_, _, vector_epoch)| epoch >= *vector_epoch)
754                    .map(|(vec, info, _)| (vec.to_ref(), info.as_ref())),
755                on_nearest_item_fn,
756            );
757            builder.finish()
758        }
759        dispatch_distance_measurement!(options.measure, MeasurementType, {
760            Ok(nearest_impl::<MeasurementType, O>(
761                &self.inner.vectors,
762                self.epoch,
763                self.table_id,
764                vec,
765                options,
766                on_nearest_item_fn,
767            ))
768        })
769    }
770}
771
772impl<R: RangeKv> RangeKvStateStore<R> {
773    fn get_keyed_row_impl(
774        &self,
775        key: TableKey<Bytes>,
776        epoch: u64,
777        table_id: TableId,
778    ) -> StorageResult<Option<StateStoreKeyedRow>> {
779        let range_bounds = (Bound::Included(key.clone()), Bound::Included(key));
780        // We do not really care about vnodes here, so we just use the default value.
781        let res = self.scan(range_bounds, epoch, table_id, Some(1))?;
782
783        Ok(match res.as_slice() {
784            [] => None,
785            [(key, value)] => Some((
786                FullKey::decode(key.as_ref()).to_vec().into_bytes(),
787                value.clone(),
788            )),
789            _ => unreachable!(),
790        })
791    }
792
793    fn iter_impl(
794        &self,
795        key_range: TableKeyRange,
796        epoch: u64,
797        table_id: TableId,
798    ) -> StorageResult<RangeKvStateStoreIter<R>> {
799        Ok(RangeKvStateStoreIter::new(
800            batched_iter::Iter::new(
801                self.inner.clone(),
802                to_full_key_range(table_id, key_range),
803                false,
804            ),
805            epoch,
806            true,
807        ))
808    }
809
810    fn rev_iter_impl(
811        &self,
812        key_range: TableKeyRange,
813        epoch: u64,
814        table_id: TableId,
815    ) -> StorageResult<RangeKvStateStoreRevIter<R>> {
816        Ok(RangeKvStateStoreRevIter::new(
817            batched_iter::Iter::new(
818                self.inner.clone(),
819                to_full_key_range(table_id, key_range),
820                true,
821            ),
822            epoch,
823            true,
824        ))
825    }
826}
827
828impl<R: RangeKv> StateStoreReadLog for RangeKvStateStore<R> {
829    type ChangeLogIter = RangeKvStateStoreChangeLogIter<R>;
830
831    async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
832        loop {
833            {
834                let tables = self.tables.lock();
835                let Some(tables) = tables.get(&options.table_id) else {
836                    return Err(HummockError::next_epoch(format!(
837                        "table {} not exist",
838                        options.table_id
839                    ))
840                    .into());
841                };
842                if let Some(next_epoch) = tables.next_epochs.get(&epoch) {
843                    break Ok(*next_epoch);
844                }
845            }
846            yield_now().await;
847        }
848    }
849
850    async fn iter_log(
851        &self,
852        (min_epoch, max_epoch): (u64, u64),
853        key_range: TableKeyRange,
854        options: ReadLogOptions,
855    ) -> StorageResult<Self::ChangeLogIter> {
856        let new_value_iter = RangeKvStateStoreIter::new(
857            batched_iter::Iter::new(
858                self.inner.clone(),
859                to_full_key_range(options.table_id, key_range.clone()),
860                false,
861            ),
862            max_epoch,
863            true,
864        );
865        let old_value_iter = RangeKvStateStoreIter::new(
866            batched_iter::Iter::new(
867                self.inner.clone(),
868                to_full_key_range(options.table_id, key_range),
869                false,
870            ),
871            min_epoch,
872            false,
873        );
874        RangeKvStateStoreChangeLogIter::new(new_value_iter, old_value_iter)
875    }
876}
877
878impl<R: RangeKv> RangeKvStateStore<R> {
879    fn new_read_snapshot_impl(
880        &self,
881        epoch: u64,
882        table_id: TableId,
883    ) -> RangeKvStateStoreReadSnapshot<R> {
884        RangeKvStateStoreReadSnapshot {
885            inner: self.clone(),
886            epoch,
887            table_id,
888        }
889    }
890
891    pub(crate) fn ingest_batch(
892        &self,
893        mut kv_pairs: Vec<(TableKey<Bytes>, StorageValue)>,
894        delete_ranges: Vec<(Bound<Bytes>, Bound<Bytes>)>,
895        epoch: u64,
896        table_id: TableId,
897    ) -> StorageResult<usize> {
898        let mut delete_keys = BTreeSet::new();
899        for del_range in delete_ranges {
900            for (key, _) in self.inner.range(
901                (
902                    del_range
903                        .0
904                        .map(|table_key| FullKey::new(table_id, TableKey(table_key), epoch)),
905                    del_range
906                        .1
907                        .map(|table_key| FullKey::new(table_id, TableKey(table_key), epoch)),
908                ),
909                None,
910            )? {
911                delete_keys.insert(key.user_key.table_key);
912            }
913        }
914        for key in delete_keys {
915            kv_pairs.push((key, StorageValue::new_delete()));
916        }
917
918        let mut size = 0;
919        self.inner
920            .ingest_batch(kv_pairs.into_iter().map(|(key, value)| {
921                size += key.len() + value.size();
922                (FullKey::new(table_id, key, epoch), value.user_value)
923            }))?;
924        Ok(size)
925    }
926
927    fn ingest_vectors(&self, table_id: TableId, epoch: u64, vecs: Vec<(Vector, Bytes)>) {
928        self.vectors
929            .write()
930            .entry(table_id)
931            .or_default()
932            .extend(vecs.into_iter().map(|(vec, info)| (vec, info, epoch)));
933    }
934}
935
936impl<R: RangeKv> StateStore for RangeKvStateStore<R> {
937    type Local = RangeKvLocalStateStore<R>;
938    type ReadSnapshot = RangeKvStateStoreReadSnapshot<R>;
939    type VectorWriter = RangeKvLocalStateStore<R>;
940
941    async fn try_wait_epoch(
942        &self,
943        _epoch: HummockReadEpoch,
944        _options: TryWaitEpochOptions,
945    ) -> StorageResult<()> {
946        // memory backend doesn't need to wait for epoch, so this is a no-op.
947        Ok(())
948    }
949
950    async fn new_local(&self, option: NewLocalOptions) -> Self::Local {
951        RangeKvLocalStateStore::new(self.clone(), option)
952    }
953
954    async fn new_read_snapshot(
955        &self,
956        epoch: HummockReadEpoch,
957        options: NewReadSnapshotOptions,
958    ) -> StorageResult<Self::ReadSnapshot> {
959        Ok(self.new_read_snapshot_impl(epoch.get_epoch(), options.table_id))
960    }
961
962    async fn new_vector_writer(&self, options: NewVectorWriterOptions) -> Self::VectorWriter {
963        RangeKvLocalStateStore::new(
964            self.clone(),
965            NewLocalOptions {
966                table_id: options.table_id,
967                op_consistency_level: Default::default(),
968                table_option: Default::default(),
969                is_replicated: false,
970                vnodes: Arc::new(Bitmap::from_bool_slice(&[true])),
971                upload_on_flush: true,
972            },
973        )
974    }
975}
976
977pub struct RangeKvLocalStateStore<R: RangeKv> {
978    mem_table: MemTable,
979    vectors: Vec<(Vector, Bytes)>,
980    inner: RangeKvStateStore<R>,
981
982    epoch: Option<EpochPair>,
983
984    table_id: TableId,
985    op_consistency_level: OpConsistencyLevel,
986    table_option: TableOption,
987    vnodes: Arc<Bitmap>,
988}
989
990impl<R: RangeKv> RangeKvLocalStateStore<R> {
991    pub fn new(inner: RangeKvStateStore<R>, option: NewLocalOptions) -> Self {
992        Self {
993            inner,
994            mem_table: MemTable::new(option.table_id, option.op_consistency_level.clone()),
995            epoch: None,
996            table_id: option.table_id,
997            op_consistency_level: option.op_consistency_level,
998            table_option: option.table_option,
999            vnodes: option.vnodes,
1000            vectors: vec![],
1001        }
1002    }
1003
1004    fn epoch(&self) -> u64 {
1005        self.epoch.expect("should have set the epoch").curr
1006    }
1007}
1008
1009impl<R: RangeKv> StateStoreGet for RangeKvLocalStateStore<R> {
1010    async fn on_key_value<O: Send + 'static>(
1011        &self,
1012        key: TableKey<Bytes>,
1013        _read_options: ReadOptions,
1014        on_key_value_fn: impl KeyValueFn<O>,
1015    ) -> StorageResult<Option<O>> {
1016        if let Some((key, value)) = match self.mem_table.buffer.get(&key) {
1017            None => self
1018                .inner
1019                .get_keyed_row_impl(key, self.epoch(), self.table_id)?,
1020            Some(op) => match op {
1021                KeyOp::Insert(value) | KeyOp::Update((_, value)) => Some((
1022                    FullKey::new(self.table_id, key, self.epoch()),
1023                    value.clone(),
1024                )),
1025                KeyOp::Delete(_) => None,
1026            },
1027        } {
1028            Ok(Some(on_key_value_fn(key.to_ref(), value.as_ref())?))
1029        } else {
1030            Ok(None)
1031        }
1032    }
1033}
1034
1035impl<R: RangeKv> LocalStateStore for RangeKvLocalStateStore<R> {
1036    type FlushedSnapshotReader = RangeKvStateStoreReadSnapshot<R>;
1037
1038    type Iter<'a> = impl StateStoreIter + 'a;
1039    type RevIter<'a> = impl StateStoreIter + 'a;
1040
1041    async fn iter(
1042        &self,
1043        key_range: TableKeyRange,
1044        _read_options: ReadOptions,
1045    ) -> StorageResult<Self::Iter<'_>> {
1046        let iter = self
1047            .inner
1048            .iter_impl(key_range.clone(), self.epoch(), self.table_id)?;
1049        Ok(FromStreamStateStoreIter::new(Box::pin(merge_stream(
1050            self.mem_table.iter(key_range),
1051            iter.into_stream(to_owned_item),
1052            self.table_id,
1053            self.epoch(),
1054            false,
1055        ))))
1056    }
1057
1058    async fn rev_iter(
1059        &self,
1060        key_range: TableKeyRange,
1061        _read_options: ReadOptions,
1062    ) -> StorageResult<Self::RevIter<'_>> {
1063        let iter = self
1064            .inner
1065            .rev_iter_impl(key_range.clone(), self.epoch(), self.table_id)?;
1066        Ok(FromStreamStateStoreIter::new(Box::pin(merge_stream(
1067            self.mem_table.rev_iter(key_range),
1068            iter.into_stream(to_owned_item),
1069            self.table_id,
1070            self.epoch(),
1071            true,
1072        ))))
1073    }
1074
1075    fn insert(
1076        &mut self,
1077        key: TableKey<Bytes>,
1078        new_val: Bytes,
1079        old_val: Option<Bytes>,
1080    ) -> StorageResult<()> {
1081        match old_val {
1082            None => self.mem_table.insert(key, new_val)?,
1083            Some(old_val) => self.mem_table.update(key, old_val, new_val)?,
1084        };
1085        Ok(())
1086    }
1087
1088    fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
1089        Ok(self.mem_table.delete(key, old_val)?)
1090    }
1091
1092    async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
1093        if self.vnodes.len() > 1 {
1094            TableState::wait_epoch(
1095                &self.inner.tables,
1096                self.table_id,
1097                self.epoch.expect("should have init").prev,
1098            )
1099            .await;
1100        }
1101        Ok(std::mem::replace(&mut self.vnodes, vnodes))
1102    }
1103
1104    fn get_table_watermark(&self, _vnode: VirtualNode) -> Option<Bytes> {
1105        // TODO: may store the written table watermark and have a correct implementation
1106        None
1107    }
1108
1109    fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
1110        self.inner.new_read_snapshot_impl(MAX_EPOCH, self.table_id)
1111    }
1112}
1113
1114impl<R: RangeKv> StateStoreWriteEpochControl for RangeKvLocalStateStore<R> {
1115    async fn flush(&mut self) -> StorageResult<usize> {
1116        let buffer = self.mem_table.drain().into_parts();
1117        let mut kv_pairs = Vec::with_capacity(buffer.len());
1118        let sanity_check_read_snapshot = if sanity_check_enabled() {
1119            Some(self.inner.new_read_snapshot_impl(MAX_EPOCH, self.table_id))
1120        } else {
1121            None
1122        };
1123        for (key, key_op) in buffer {
1124            match key_op {
1125                // Currently, some executors do not strictly comply with these semantics. As
1126                // a workaround you may call disable the check by initializing the
1127                // state store with `op_consistency_level=Inconsistent`.
1128                KeyOp::Insert(value) => {
1129                    if let Some(sanity_check_read_snapshot) = &sanity_check_read_snapshot {
1130                        do_insert_sanity_check(
1131                            self.table_id,
1132                            &key,
1133                            &value,
1134                            sanity_check_read_snapshot,
1135                            self.table_option,
1136                            &self.op_consistency_level,
1137                        )
1138                        .await?;
1139                    }
1140                    kv_pairs.push((key, StorageValue::new_put(value)));
1141                }
1142                KeyOp::Delete(old_value) => {
1143                    if let Some(sanity_check_read_snapshot) = &sanity_check_read_snapshot {
1144                        do_delete_sanity_check(
1145                            self.table_id,
1146                            &key,
1147                            &old_value,
1148                            sanity_check_read_snapshot,
1149                            self.table_option,
1150                            &self.op_consistency_level,
1151                        )
1152                        .await?;
1153                    }
1154                    kv_pairs.push((key, StorageValue::new_delete()));
1155                }
1156                KeyOp::Update((old_value, new_value)) => {
1157                    if let Some(sanity_check_read_snapshot) = &sanity_check_read_snapshot {
1158                        do_update_sanity_check(
1159                            self.table_id,
1160                            &key,
1161                            &old_value,
1162                            &new_value,
1163                            sanity_check_read_snapshot,
1164                            self.table_option,
1165                            &self.op_consistency_level,
1166                        )
1167                        .await?;
1168                    }
1169                    kv_pairs.push((key, StorageValue::new_put(new_value)));
1170                }
1171            }
1172        }
1173        let epoch = self.epoch();
1174        self.inner
1175            .ingest_vectors(self.table_id, epoch, take(&mut self.vectors));
1176        self.inner
1177            .ingest_batch(kv_pairs, vec![], epoch, self.table_id)
1178    }
1179
1180    async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
1181        assert_eq!(
1182            self.epoch.replace(options.epoch),
1183            None,
1184            "epoch in local state store of table id {:?} is init for more than once",
1185            self.table_id
1186        );
1187        self.inner
1188            .tables
1189            .lock()
1190            .entry(self.table_id)
1191            .or_insert_with(|| TableState::new(options.epoch.prev))
1192            .next_epochs
1193            .insert(options.epoch.prev, options.epoch.curr);
1194        if self.vnodes.len() > 1 {
1195            TableState::wait_epoch(&self.inner.tables, self.table_id, options.epoch.prev).await;
1196        }
1197
1198        Ok(())
1199    }
1200
1201    fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
1202        assert!(!self.mem_table.is_dirty());
1203        if let Some(value_checker) = opts.switch_op_consistency_level {
1204            self.mem_table.op_consistency_level.update(&value_checker);
1205        }
1206        let epoch = self
1207            .epoch
1208            .as_mut()
1209            .expect("should have init epoch before seal the first epoch");
1210        let prev_epoch = epoch.curr;
1211        epoch.prev = prev_epoch;
1212        epoch.curr = next_epoch;
1213        assert!(
1214            next_epoch > prev_epoch,
1215            "new epoch {} should be greater than current epoch: {}",
1216            next_epoch,
1217            prev_epoch
1218        );
1219
1220        let mut tables = self.inner.tables.lock();
1221        let table_state = tables
1222            .get_mut(&self.table_id)
1223            .expect("should be set when init");
1224
1225        table_state.next_epochs.insert(prev_epoch, next_epoch);
1226        if self.vnodes.len() > 1 {
1227            let sealing_epoch_vnodes = table_state
1228                .sealing_epochs
1229                .entry(prev_epoch)
1230                .or_insert_with(|| BitmapBuilder::zeroed(self.vnodes.len()));
1231            assert_eq!(self.vnodes.len(), sealing_epoch_vnodes.len());
1232            for vnode in self.vnodes.iter_ones() {
1233                assert!(!sealing_epoch_vnodes.is_set(vnode));
1234                sealing_epoch_vnodes.set(vnode, true);
1235            }
1236            if (0..self.vnodes.len()).all(|vnode| sealing_epoch_vnodes.is_set(vnode)) {
1237                let (all_sealed_epoch, _) =
1238                    table_state.sealing_epochs.pop_first().expect("non-empty");
1239                assert_eq!(
1240                    all_sealed_epoch, prev_epoch,
1241                    "new all_sealed_epoch must be the current prev epoch"
1242                );
1243                if let Some(prev_latest_sealed_epoch) =
1244                    table_state.latest_sealed_epoch.replace(prev_epoch)
1245                {
1246                    assert!(prev_epoch > prev_latest_sealed_epoch);
1247                }
1248            }
1249        }
1250
1251        if let Some((direction, watermarks, _watermark_type)) = opts.table_watermarks {
1252            let delete_ranges = watermarks
1253                .iter()
1254                .flat_map(|vnode_watermark| {
1255                    let inner_range = match direction {
1256                        WatermarkDirection::Ascending => {
1257                            (Unbounded, Excluded(vnode_watermark.watermark().clone()))
1258                        }
1259                        WatermarkDirection::Descending => {
1260                            (Excluded(vnode_watermark.watermark().clone()), Unbounded)
1261                        }
1262                    };
1263                    vnode_watermark
1264                        .vnode_bitmap()
1265                        .iter_vnodes()
1266                        .map(move |vnode| {
1267                            let (start, end) =
1268                                prefixed_range_with_vnode(inner_range.clone(), vnode);
1269                            (start.map(|key| key.0.clone()), end.map(|key| key.0.clone()))
1270                        })
1271                })
1272                .collect_vec();
1273            if let Err(e) =
1274                self.inner
1275                    .ingest_batch(Vec::new(), delete_ranges, self.epoch(), self.table_id)
1276            {
1277                error!(error = %e.as_report(), "failed to write delete ranges of table watermark");
1278            }
1279        }
1280    }
1281
1282    async fn try_flush(&mut self) -> StorageResult<()> {
1283        Ok(())
1284    }
1285}
1286
1287impl<R: RangeKv> StateStoreWriteVector for RangeKvLocalStateStore<R> {
1288    fn insert(&mut self, vec: Vector, info: Bytes) -> StorageResult<()> {
1289        self.vectors.push((vec, info));
1290        Ok(())
1291    }
1292}
1293
1294pub struct RangeKvStateStoreIter<R: RangeKv> {
1295    inner: batched_iter::Iter<R>,
1296
1297    epoch: HummockEpoch,
1298    is_inclusive_epoch: bool,
1299
1300    last_key: Option<UserKey<Bytes>>,
1301
1302    item_buffer: Option<StateStoreKeyedRow>,
1303}
1304
1305impl<R: RangeKv> RangeKvStateStoreIter<R> {
1306    pub fn new(
1307        inner: batched_iter::Iter<R>,
1308        epoch: HummockEpoch,
1309        is_inclusive_epoch: bool,
1310    ) -> Self {
1311        Self {
1312            inner,
1313            epoch,
1314            is_inclusive_epoch,
1315            last_key: None,
1316            item_buffer: None,
1317        }
1318    }
1319}
1320
1321impl<R: RangeKv> StateStoreIter for RangeKvStateStoreIter<R> {
1322    async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
1323        self.next_inner()?;
1324        Ok(self
1325            .item_buffer
1326            .as_ref()
1327            .map(|(key, value)| (key.to_ref(), value.as_ref())))
1328    }
1329}
1330
1331impl<R: RangeKv> RangeKvStateStoreIter<R> {
1332    fn next_inner(&mut self) -> StorageResult<()> {
1333        self.item_buffer = None;
1334        while let Some((key, value)) = self.inner.next()? {
1335            let epoch = key.epoch_with_gap.pure_epoch();
1336            if epoch > self.epoch {
1337                continue;
1338            }
1339            if epoch == self.epoch && !self.is_inclusive_epoch {
1340                continue;
1341            }
1342            if Some(key.user_key.as_ref()) != self.last_key.as_ref().map(|key| key.as_ref()) {
1343                self.last_key = Some(key.user_key.clone());
1344                if let Some(value) = value {
1345                    self.item_buffer = Some((key, value));
1346                    break;
1347                }
1348            }
1349        }
1350        Ok(())
1351    }
1352}
1353
1354pub struct RangeKvStateStoreRevIter<R: RangeKv> {
1355    inner: batched_iter::Iter<R>,
1356
1357    epoch: HummockEpoch,
1358    is_inclusive_epoch: bool,
1359
1360    item_buffer: VecDeque<StateStoreKeyedRow>,
1361}
1362
1363impl<R: RangeKv> RangeKvStateStoreRevIter<R> {
1364    pub fn new(
1365        inner: batched_iter::Iter<R>,
1366        epoch: HummockEpoch,
1367        is_inclusive_epoch: bool,
1368    ) -> Self {
1369        Self {
1370            inner,
1371            epoch,
1372            is_inclusive_epoch,
1373            item_buffer: VecDeque::default(),
1374        }
1375    }
1376}
1377
1378impl<R: RangeKv> StateStoreIter for RangeKvStateStoreRevIter<R> {
1379    async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
1380        self.next_inner()?;
1381        Ok(self
1382            .item_buffer
1383            .back()
1384            .map(|(key, value)| (key.to_ref(), value.as_ref())))
1385    }
1386}
1387
1388impl<R: RangeKv> RangeKvStateStoreRevIter<R> {
1389    fn next_inner(&mut self) -> StorageResult<()> {
1390        self.item_buffer.pop_back();
1391        while let Some((key, value)) = self.inner.next()? {
1392            let epoch = key.epoch_with_gap.pure_epoch();
1393            if epoch > self.epoch {
1394                continue;
1395            }
1396            if epoch == self.epoch && !self.is_inclusive_epoch {
1397                continue;
1398            }
1399
1400            let v = match value {
1401                Some(v) => v,
1402                None => {
1403                    if let Some(last_key) = self.item_buffer.front()
1404                        && key.user_key.as_ref() == last_key.0.user_key.as_ref()
1405                    {
1406                        self.item_buffer.clear();
1407                    }
1408                    continue;
1409                }
1410            };
1411
1412            if let Some(last_key) = self.item_buffer.front() {
1413                if key.user_key.as_ref() != last_key.0.user_key.as_ref() {
1414                    self.item_buffer.push_front((key, v));
1415                    break;
1416                } else {
1417                    self.item_buffer.pop_front();
1418                    self.item_buffer.push_front((key, v));
1419                }
1420            } else {
1421                self.item_buffer.push_front((key, v));
1422            }
1423        }
1424        Ok(())
1425    }
1426}
1427
1428pub struct RangeKvStateStoreChangeLogIter<R: RangeKv> {
1429    new_value_iter: RangeKvStateStoreIter<R>,
1430    old_value_iter: RangeKvStateStoreIter<R>,
1431    item_buffer: Option<(TableKey<Bytes>, ChangeLogValue<Bytes>)>,
1432}
1433
1434impl<R: RangeKv> RangeKvStateStoreChangeLogIter<R> {
1435    fn new(
1436        mut new_value_iter: RangeKvStateStoreIter<R>,
1437        mut old_value_iter: RangeKvStateStoreIter<R>,
1438    ) -> StorageResult<Self> {
1439        new_value_iter.next_inner()?;
1440        old_value_iter.next_inner()?;
1441        Ok(Self {
1442            new_value_iter,
1443            old_value_iter,
1444            item_buffer: None,
1445        })
1446    }
1447}
1448
1449impl<R: RangeKv> StateStoreIter<StateStoreReadLogItem> for RangeKvStateStoreChangeLogIter<R> {
1450    async fn try_next(&mut self) -> StorageResult<Option<StateStoreReadLogItemRef<'_>>> {
1451        loop {
1452            match (
1453                &self.new_value_iter.item_buffer,
1454                &self.old_value_iter.item_buffer,
1455            ) {
1456                (None, None) => {
1457                    self.item_buffer = None;
1458                    break;
1459                }
1460                (Some((key, new_value)), None) => {
1461                    self.item_buffer = Some((
1462                        key.user_key.table_key.clone(),
1463                        ChangeLogValue::Insert(new_value.clone()),
1464                    ));
1465                    self.new_value_iter.next_inner()?;
1466                }
1467                (None, Some((key, old_value))) => {
1468                    self.item_buffer = Some((
1469                        key.user_key.table_key.clone(),
1470                        ChangeLogValue::Delete(old_value.clone()),
1471                    ));
1472                    self.old_value_iter.next_inner()?;
1473                }
1474                (Some((new_value_key, new_value)), Some((old_value_key, old_value))) => {
1475                    match new_value_key.user_key.cmp(&old_value_key.user_key) {
1476                        Ordering::Less => {
1477                            self.item_buffer = Some((
1478                                new_value_key.user_key.table_key.clone(),
1479                                ChangeLogValue::Insert(new_value.clone()),
1480                            ));
1481                            self.new_value_iter.next_inner()?;
1482                        }
1483                        Ordering::Greater => {
1484                            self.item_buffer = Some((
1485                                old_value_key.user_key.table_key.clone(),
1486                                ChangeLogValue::Delete(old_value.clone()),
1487                            ));
1488                            self.old_value_iter.next_inner()?;
1489                        }
1490                        Ordering::Equal => {
1491                            if new_value == old_value {
1492                                self.new_value_iter.next_inner()?;
1493                                self.old_value_iter.next_inner()?;
1494                                continue;
1495                            }
1496                            self.item_buffer = Some((
1497                                new_value_key.user_key.table_key.clone(),
1498                                ChangeLogValue::Update {
1499                                    new_value: new_value.clone(),
1500                                    old_value: old_value.clone(),
1501                                },
1502                            ));
1503                            self.new_value_iter.next_inner()?;
1504                            self.old_value_iter.next_inner()?;
1505                        }
1506                    }
1507                }
1508            };
1509            break;
1510        }
1511        Ok(self
1512            .item_buffer
1513            .as_ref()
1514            .map(|(key, value)| (key.to_ref(), value.to_ref())))
1515    }
1516}
1517
1518#[cfg(test)]
1519mod tests {
1520    use risingwave_common::util::epoch::test_epoch;
1521
1522    use super::*;
1523    use crate::hummock::iterator::test_utils::{
1524        iterator_test_table_key_of, iterator_test_value_of,
1525    };
1526    use crate::hummock::test_utils::{ReadOptions, *};
1527    use crate::memory::sled::SledStateStore;
1528
1529    #[tokio::test]
1530    async fn test_snapshot_isolation_memory() {
1531        let state_store = MemoryStateStore::new();
1532        test_snapshot_isolation_inner(state_store).await;
1533    }
1534
1535    #[cfg(not(madsim))]
1536    #[tokio::test]
1537    async fn test_snapshot_isolation_sled() {
1538        let state_store = SledStateStore::new_temp();
1539        test_snapshot_isolation_inner(state_store).await;
1540    }
1541
1542    async fn test_snapshot_isolation_inner(state_store: RangeKvStateStore<impl RangeKv>) {
1543        state_store
1544            .ingest_batch(
1545                vec![
1546                    (
1547                        TableKey(Bytes::from(b"a".to_vec())),
1548                        StorageValue::new_put(b"v1".to_vec()),
1549                    ),
1550                    (
1551                        TableKey(Bytes::from(b"b".to_vec())),
1552                        StorageValue::new_put(b"v1".to_vec()),
1553                    ),
1554                ],
1555                vec![],
1556                0,
1557                Default::default(),
1558            )
1559            .unwrap();
1560        state_store
1561            .ingest_batch(
1562                vec![
1563                    (
1564                        TableKey(Bytes::from(b"a".to_vec())),
1565                        StorageValue::new_put(b"v2".to_vec()),
1566                    ),
1567                    (
1568                        TableKey(Bytes::from(b"b".to_vec())),
1569                        StorageValue::new_delete(),
1570                    ),
1571                ],
1572                vec![],
1573                test_epoch(1),
1574                Default::default(),
1575            )
1576            .unwrap();
1577        assert_eq!(
1578            state_store
1579                .scan(
1580                    (
1581                        Bound::Included(TableKey(Bytes::from("a"))),
1582                        Bound::Included(TableKey(Bytes::from("b"))),
1583                    ),
1584                    0,
1585                    TableId::default(),
1586                    None,
1587                )
1588                .unwrap(),
1589            vec![
1590                (
1591                    FullKey::for_test(Default::default(), Bytes::from("a"), 0)
1592                        .encode()
1593                        .into(),
1594                    b"v1".to_vec().into()
1595                ),
1596                (
1597                    FullKey::for_test(Default::default(), Bytes::from("b"), 0)
1598                        .encode()
1599                        .into(),
1600                    b"v1".to_vec().into()
1601                )
1602            ]
1603        );
1604        assert_eq!(
1605            state_store
1606                .scan(
1607                    (
1608                        Bound::Included(TableKey(Bytes::from("a"))),
1609                        Bound::Included(TableKey(Bytes::from("b"))),
1610                    ),
1611                    0,
1612                    TableId::default(),
1613                    Some(1),
1614                )
1615                .unwrap(),
1616            vec![(
1617                FullKey::for_test(Default::default(), b"a".to_vec(), 0)
1618                    .encode()
1619                    .into(),
1620                b"v1".to_vec().into()
1621            )]
1622        );
1623        assert_eq!(
1624            state_store
1625                .scan(
1626                    (
1627                        Bound::Included(TableKey(Bytes::from("a"))),
1628                        Bound::Included(TableKey(Bytes::from("b"))),
1629                    ),
1630                    test_epoch(1),
1631                    TableId::default(),
1632                    None,
1633                )
1634                .unwrap(),
1635            vec![(
1636                FullKey::for_test(Default::default(), b"a".to_vec(), test_epoch(1))
1637                    .encode()
1638                    .into(),
1639                b"v2".to_vec().into()
1640            )]
1641        );
1642        assert_eq!(
1643            state_store
1644                .get(TableKey(Bytes::from("a")), 0, ReadOptions::default())
1645                .await
1646                .unwrap(),
1647            Some(Bytes::from("v1"))
1648        );
1649        assert_eq!(
1650            state_store
1651                .get(
1652                    TableKey(Bytes::copy_from_slice(b"b")),
1653                    0,
1654                    ReadOptions::default(),
1655                )
1656                .await
1657                .unwrap(),
1658            Some(b"v1".to_vec().into())
1659        );
1660        assert_eq!(
1661            state_store
1662                .get(
1663                    TableKey(Bytes::copy_from_slice(b"c")),
1664                    0,
1665                    ReadOptions::default(),
1666                )
1667                .await
1668                .unwrap(),
1669            None
1670        );
1671        assert_eq!(
1672            state_store
1673                .get(
1674                    TableKey(Bytes::copy_from_slice(b"a")),
1675                    test_epoch(1),
1676                    ReadOptions::default(),
1677                )
1678                .await
1679                .unwrap(),
1680            Some(b"v2".to_vec().into())
1681        );
1682        assert_eq!(
1683            state_store
1684                .get(
1685                    TableKey(Bytes::from("b")),
1686                    test_epoch(1),
1687                    ReadOptions::default(),
1688                )
1689                .await
1690                .unwrap(),
1691            None
1692        );
1693        assert_eq!(
1694            state_store
1695                .get(
1696                    TableKey(Bytes::from("c")),
1697                    test_epoch(1),
1698                    ReadOptions::default()
1699                )
1700                .await
1701                .unwrap(),
1702            None
1703        );
1704    }
1705
1706    #[tokio::test]
1707    async fn test_iter_log_memory() {
1708        let state_store = MemoryStateStore::new();
1709        test_iter_log_inner(state_store).await;
1710    }
1711
1712    #[cfg(not(madsim))]
1713    #[tokio::test]
1714    async fn test_iter_log_sled() {
1715        let state_store = SledStateStore::new_temp();
1716        test_iter_log_inner(state_store).await;
1717    }
1718
1719    async fn test_iter_log_inner(state_store: RangeKvStateStore<impl RangeKv>) {
1720        let table_id = TableId::new(233);
1721        let epoch1 = test_epoch(1);
1722        let key_idx = [1, 2, 4];
1723        let make_key = |i| TableKey(Bytes::from(iterator_test_table_key_of(i)));
1724        let make_value = |i| Bytes::from(iterator_test_value_of(i));
1725        state_store
1726            .ingest_batch(
1727                key_idx
1728                    .iter()
1729                    .map(|i| (make_key(*i), StorageValue::new_put(make_value(*i))))
1730                    .collect(),
1731                vec![],
1732                epoch1,
1733                table_id,
1734            )
1735            .unwrap();
1736        {
1737            let mut iter = state_store
1738                .iter_log(
1739                    (epoch1, epoch1),
1740                    (Unbounded, Unbounded),
1741                    ReadLogOptions { table_id },
1742                )
1743                .await
1744                .unwrap();
1745            for i in key_idx {
1746                let (iter_key, change_value) = iter.try_next().await.unwrap().unwrap();
1747                assert_eq!(make_key(i).to_ref(), iter_key);
1748                assert_eq!(change_value, ChangeLogValue::Insert(make_value(i).as_ref()));
1749            }
1750            assert!(iter.try_next().await.unwrap().is_none());
1751        }
1752
1753        let epoch2 = test_epoch(2);
1754        state_store
1755            .ingest_batch(
1756                vec![
1757                    (make_key(1), StorageValue::new_put(make_value(12))), // update
1758                    (make_key(2), StorageValue::new_delete()),            // delete
1759                    (make_key(3), StorageValue::new_put(make_value(3))),
1760                ],
1761                vec![],
1762                epoch2,
1763                table_id,
1764            )
1765            .unwrap();
1766
1767        // check iter log between two epoch
1768        {
1769            let expected = vec![
1770                (
1771                    make_key(1),
1772                    ChangeLogValue::Update {
1773                        new_value: make_value(12),
1774                        old_value: make_value(1),
1775                    },
1776                ),
1777                (make_key(2), ChangeLogValue::Delete(make_value(2))),
1778                (make_key(3), ChangeLogValue::Insert(make_value(3))),
1779            ];
1780            let mut iter = state_store
1781                .iter_log(
1782                    (epoch2, epoch2),
1783                    (Unbounded, Unbounded),
1784                    ReadLogOptions { table_id },
1785                )
1786                .await
1787                .unwrap();
1788            for (key, change_log_value) in expected {
1789                let (iter_key, iter_value) = iter.try_next().await.unwrap().unwrap();
1790                assert_eq!(
1791                    key.to_ref(),
1792                    iter_key,
1793                    "{:?} {:?}",
1794                    change_log_value.to_ref(),
1795                    iter_value
1796                );
1797                assert_eq!(change_log_value.to_ref(), iter_value);
1798            }
1799            assert!(iter.try_next().await.unwrap().is_none());
1800        }
1801        // check iter log on the original old epoch
1802        {
1803            let mut iter = state_store
1804                .iter_log(
1805                    (epoch1, epoch1),
1806                    (Unbounded, Unbounded),
1807                    ReadLogOptions { table_id },
1808                )
1809                .await
1810                .unwrap();
1811            for i in key_idx {
1812                let (iter_key, change_value) = iter.try_next().await.unwrap().unwrap();
1813                assert_eq!(make_key(i).to_ref(), iter_key);
1814                assert_eq!(change_value, ChangeLogValue::Insert(make_value(i).as_ref()));
1815            }
1816            assert!(iter.try_next().await.unwrap().is_none());
1817        }
1818        // check iter on merging the two epochs
1819        {
1820            let mut iter = state_store
1821                .iter_log(
1822                    (epoch1, epoch2),
1823                    (Unbounded, Unbounded),
1824                    ReadLogOptions { table_id },
1825                )
1826                .await
1827                .unwrap();
1828            let (iter_key, change_value) = iter.try_next().await.unwrap().unwrap();
1829            assert_eq!(make_key(1).to_ref(), iter_key);
1830            assert_eq!(
1831                change_value,
1832                ChangeLogValue::Insert(make_value(12).as_ref())
1833            );
1834            for i in [3, 4] {
1835                let (iter_key, change_value) = iter.try_next().await.unwrap().unwrap();
1836                assert_eq!(make_key(i).to_ref(), iter_key);
1837                assert_eq!(change_value, ChangeLogValue::Insert(make_value(i).as_ref()));
1838            }
1839            assert!(iter.try_next().await.unwrap().is_none());
1840        }
1841    }
1842}