risingwave_storage/
memory.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
17use std::mem::take;
18use std::ops::Bound::{Excluded, Included, Unbounded};
19use std::ops::{Bound, RangeBounds};
20use std::sync::{Arc, LazyLock};
21
22use bytes::Bytes;
23use itertools::Itertools;
24use parking_lot::RwLock;
25use risingwave_common::array::VectorRef;
26use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
27use risingwave_common::catalog::TableId;
28use risingwave_common::dispatch_distance_measurement;
29use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
30use risingwave_common::id::FragmentId;
31use risingwave_common::types::ScalarRef;
32use risingwave_common::util::epoch::{EpochPair, MAX_EPOCH};
33use risingwave_hummock_sdk::key::{
34    FullKey, TableKey, TableKeyRange, UserKey, prefixed_range_with_vnode,
35};
36use risingwave_hummock_sdk::table_watermark::WatermarkDirection;
37use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch};
38use thiserror_ext::AsReport;
39use tokio::task::yield_now;
40use tracing::error;
41
42use crate::error::StorageResult;
43use crate::hummock::HummockError;
44use crate::hummock::utils::{
45    do_delete_sanity_check, do_insert_sanity_check, do_update_sanity_check, merge_stream,
46    sanity_check_enabled,
47};
48use crate::mem_table::{KeyOp, MemTable};
49use crate::storage_value::StorageValue;
50use crate::store::*;
51use crate::vector::{MeasureDistanceBuilder, NearestBuilder};
52
53pub type BytesFullKey = FullKey<Bytes>;
54pub type BytesFullKeyRange = (Bound<BytesFullKey>, Bound<BytesFullKey>);
55
56#[allow(clippy::type_complexity)]
57pub trait RangeKv: Clone + Send + Sync + 'static {
58    fn range(
59        &self,
60        range: BytesFullKeyRange,
61        limit: Option<usize>,
62    ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>>;
63
64    fn rev_range(
65        &self,
66        range: BytesFullKeyRange,
67        limit: Option<usize>,
68    ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>>;
69
70    fn ingest_batch(
71        &self,
72        kv_pairs: impl Iterator<Item = (BytesFullKey, Option<Bytes>)>,
73    ) -> StorageResult<()>;
74
75    fn flush(&self) -> StorageResult<()>;
76}
77
78pub type BTreeMapRangeKv = Arc<RwLock<BTreeMap<BytesFullKey, Option<Bytes>>>>;
79
80impl RangeKv for BTreeMapRangeKv {
81    fn range(
82        &self,
83        range: BytesFullKeyRange,
84        limit: Option<usize>,
85    ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>> {
86        let limit = limit.unwrap_or(usize::MAX);
87        Ok(self
88            .read()
89            .range(range)
90            .take(limit)
91            .map(|(key, value)| (key.clone(), value.clone()))
92            .collect())
93    }
94
95    fn rev_range(
96        &self,
97        range: BytesFullKeyRange,
98        limit: Option<usize>,
99    ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>> {
100        let limit = limit.unwrap_or(usize::MAX);
101        Ok(self
102            .read()
103            .range(range)
104            .rev()
105            .take(limit)
106            .map(|(key, value)| (key.clone(), value.clone()))
107            .collect())
108    }
109
110    fn ingest_batch(
111        &self,
112        kv_pairs: impl Iterator<Item = (BytesFullKey, Option<Bytes>)>,
113    ) -> StorageResult<()> {
114        let mut inner = self.write();
115        for (key, value) in kv_pairs {
116            inner.insert(key, value);
117        }
118        Ok(())
119    }
120
121    fn flush(&self) -> StorageResult<()> {
122        Ok(())
123    }
124}
125
126pub mod sled {
127    use std::fs::create_dir_all;
128    use std::ops::RangeBounds;
129
130    use bytes::Bytes;
131    use risingwave_hummock_sdk::key::FullKey;
132
133    use crate::error::StorageResult;
134    use crate::memory::{BytesFullKey, BytesFullKeyRange, RangeKv, RangeKvStateStore};
135
136    #[derive(Clone)]
137    pub struct SledRangeKv {
138        inner: sled::Db,
139    }
140
141    impl SledRangeKv {
142        pub fn new(path: impl AsRef<std::path::Path>) -> Self {
143            SledRangeKv {
144                inner: sled::open(path).expect("open"),
145            }
146        }
147
148        pub fn new_temp() -> Self {
149            create_dir_all("./.risingwave/sled").expect("should create");
150            let path = tempfile::TempDir::new_in("./.risingwave/sled")
151                .expect("find temp dir")
152                .keep();
153            Self::new(path)
154        }
155    }
156
157    const EMPTY: u8 = 1;
158    const NON_EMPTY: u8 = 0;
159
160    impl RangeKv for SledRangeKv {
161        fn range(
162            &self,
163            range: BytesFullKeyRange,
164            limit: Option<usize>,
165        ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>> {
166            let (left, right) = range;
167            let full_key_ref_bound = (
168                left.as_ref().map(FullKey::to_ref),
169                right.as_ref().map(FullKey::to_ref),
170            );
171            let left_encoded = left.as_ref().map(|key| key.to_ref().encode_reverse_epoch());
172            let right_encoded = right
173                .as_ref()
174                .map(|key| key.to_ref().encode_reverse_epoch());
175            let limit = limit.unwrap_or(usize::MAX);
176            let mut ret = vec![];
177            for result in self.inner.range((left_encoded, right_encoded)).take(limit) {
178                let (key, value) = result?;
179                let full_key = FullKey::decode_reverse_epoch(key.as_ref()).copy_into();
180                if !full_key_ref_bound.contains(&full_key.to_ref()) {
181                    continue;
182                }
183                let value = match value.as_ref() {
184                    [EMPTY] => None,
185                    [NON_EMPTY, rest @ ..] => Some(Bytes::from(Vec::from(rest))),
186                    _ => unreachable!("malformed value: {:?}", value),
187                };
188                ret.push((full_key, value))
189            }
190            Ok(ret)
191        }
192
193        fn rev_range(
194            &self,
195            range: BytesFullKeyRange,
196            limit: Option<usize>,
197        ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>> {
198            let (left, right) = range;
199            let full_key_ref_bound = (
200                left.as_ref().map(FullKey::to_ref),
201                right.as_ref().map(FullKey::to_ref),
202            );
203            let left_encoded = left.as_ref().map(|key| key.to_ref().encode_reverse_epoch());
204            let right_encoded = right
205                .as_ref()
206                .map(|key| key.to_ref().encode_reverse_epoch());
207            let limit = limit.unwrap_or(usize::MAX);
208            let mut ret = vec![];
209            for result in self
210                .inner
211                .range((left_encoded, right_encoded))
212                .rev()
213                .take(limit)
214            {
215                let (key, value) = result?;
216                let full_key = FullKey::decode_reverse_epoch(key.as_ref()).copy_into();
217                if !full_key_ref_bound.contains(&full_key.to_ref()) {
218                    continue;
219                }
220                let value = match value.as_ref() {
221                    [EMPTY] => None,
222                    [NON_EMPTY, rest @ ..] => Some(Bytes::from(Vec::from(rest))),
223                    _ => unreachable!("malformed value: {:?}", value),
224                };
225                ret.push((full_key, value))
226            }
227            Ok(ret)
228        }
229
230        fn ingest_batch(
231            &self,
232            kv_pairs: impl Iterator<Item = (BytesFullKey, Option<Bytes>)>,
233        ) -> StorageResult<()> {
234            let mut batch = sled::Batch::default();
235            for (key, value) in kv_pairs {
236                let encoded_key = key.encode_reverse_epoch();
237                let key = sled::IVec::from(encoded_key);
238                let mut buffer =
239                    Vec::with_capacity(value.as_ref().map(|v| v.len()).unwrap_or_default() + 1);
240                if let Some(value) = value {
241                    buffer.push(NON_EMPTY);
242                    buffer.extend_from_slice(value.as_ref());
243                } else {
244                    buffer.push(EMPTY);
245                }
246                let value = sled::IVec::from(buffer);
247                batch.insert(key, value);
248            }
249            self.inner.apply_batch(batch)?;
250            Ok(())
251        }
252
253        fn flush(&self) -> StorageResult<()> {
254            Ok(self.inner.flush().map(|_| {})?)
255        }
256    }
257
258    pub type SledStateStore = RangeKvStateStore<SledRangeKv>;
259
260    impl SledStateStore {
261        pub fn new(path: impl AsRef<std::path::Path>) -> Self {
262            RangeKvStateStore {
263                inner: SledRangeKv::new(path),
264                tables: Default::default(),
265                vectors: Default::default(),
266            }
267        }
268
269        pub fn new_temp() -> Self {
270            RangeKvStateStore {
271                inner: SledRangeKv::new_temp(),
272                tables: Default::default(),
273                vectors: Default::default(),
274            }
275        }
276    }
277
278    #[cfg(test)]
279    mod test {
280        use std::ops::{Bound, RangeBounds};
281
282        use bytes::Bytes;
283        use risingwave_common::catalog::TableId;
284        use risingwave_common::util::epoch::EPOCH_SPILL_TIME_MASK;
285        use risingwave_hummock_sdk::EpochWithGap;
286        use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey};
287
288        use crate::memory::RangeKv;
289        use crate::memory::sled::SledRangeKv;
290
291        #[test]
292        fn test_filter_variable_key_length_false_positive() {
293            let table_id = TableId::new(233);
294            let epoch = u64::MAX - u64::from_be_bytes([1, 2, 3, 4, 5, 6, 7, 8]);
295            let excluded_short_table_key = [0, 1, 0, 0];
296            let included_long_table_key = [0, 1, 0, 0, 1, 2];
297            let left_table_key = [0, 1, 0, 0, 1];
298            let right_table_key = [0, 1, 1, 1];
299
300            let to_full_key = |table_key: &[u8]| FullKey {
301                user_key: UserKey {
302                    table_id,
303                    table_key: TableKey(Bytes::from(table_key.to_vec())),
304                },
305                epoch_with_gap: EpochWithGap::new_from_epoch(epoch & !EPOCH_SPILL_TIME_MASK),
306            };
307
308            let left_full_key = to_full_key(&left_table_key[..]);
309            let right_full_key = to_full_key(&right_table_key[..]);
310            let included_long_full_key = to_full_key(&included_long_table_key[..]);
311            let excluded_short_full_key = to_full_key(&excluded_short_table_key[..]);
312
313            assert!(
314                (
315                    Bound::Included(left_full_key.to_ref()),
316                    Bound::Included(right_full_key.to_ref())
317                )
318                    .contains(&included_long_full_key.to_ref())
319            );
320            assert!(
321                !(
322                    Bound::Included(left_full_key.to_ref()),
323                    Bound::Included(right_full_key.to_ref())
324                )
325                    .contains(&excluded_short_full_key.to_ref())
326            );
327
328            let left_encoded = left_full_key.encode_reverse_epoch();
329            let right_encoded = right_full_key.encode_reverse_epoch();
330
331            assert!(
332                (
333                    Bound::Included(left_encoded.clone()),
334                    Bound::Included(right_encoded.clone())
335                )
336                    .contains(&included_long_full_key.encode_reverse_epoch())
337            );
338            assert!(
339                (
340                    Bound::Included(left_encoded),
341                    Bound::Included(right_encoded)
342                )
343                    .contains(&excluded_short_full_key.encode_reverse_epoch())
344            );
345
346            let sled_range_kv = SledRangeKv::new_temp();
347            sled_range_kv
348                .ingest_batch(
349                    vec![
350                        (included_long_full_key.clone(), None),
351                        (excluded_short_full_key, None),
352                    ]
353                    .into_iter(),
354                )
355                .unwrap();
356            let kvs = sled_range_kv
357                .range(
358                    (
359                        Bound::Included(left_full_key),
360                        Bound::Included(right_full_key),
361                    ),
362                    None,
363                )
364                .unwrap();
365            assert_eq!(1, kvs.len());
366            assert_eq!(included_long_full_key.to_ref(), kvs[0].0.to_ref());
367            assert!(kvs[0].1.is_none());
368        }
369    }
370}
371
372mod batched_iter {
373
374    use super::*;
375
376    /// A utility struct for iterating over a range of keys in a locked `BTreeMap`, which will batch
377    /// some records to make a trade-off between the copying overhead and the times of acquiring
378    /// the lock.
379    ///
380    /// Therefore, it's not guaranteed that we're iterating over a consistent snapshot of the map.
381    /// Users should handle MVCC by themselves.
382    pub struct Iter<R: RangeKv> {
383        inner: R,
384        range: BytesFullKeyRange,
385        current: std::vec::IntoIter<(FullKey<Bytes>, Option<Bytes>)>,
386        rev: bool,
387    }
388
389    impl<R: RangeKv> Iter<R> {
390        pub fn new(inner: R, range: BytesFullKeyRange, rev: bool) -> Self {
391            Self {
392                inner,
393                range,
394                rev,
395                current: Vec::new().into_iter(),
396            }
397        }
398    }
399
400    impl<R: RangeKv> Iter<R> {
401        const BATCH_SIZE: usize = 256;
402
403        /// Get the next batch of records and fill the `current` buffer.
404        fn refill(&mut self) -> StorageResult<()> {
405            assert!(self.current.is_empty());
406
407            let batch = if self.rev {
408                self.inner.rev_range(
409                    (self.range.0.clone(), self.range.1.clone()),
410                    Some(Self::BATCH_SIZE),
411                )?
412            } else {
413                self.inner.range(
414                    (self.range.0.clone(), self.range.1.clone()),
415                    Some(Self::BATCH_SIZE),
416                )?
417            };
418
419            if let Some((last_key, _)) = batch.last() {
420                let full_key = FullKey::new_with_gap_epoch(
421                    last_key.user_key.table_id,
422                    TableKey(last_key.user_key.table_key.0.clone()),
423                    last_key.epoch_with_gap,
424                );
425                if self.rev {
426                    self.range.1 = Bound::Excluded(full_key);
427                } else {
428                    self.range.0 = Bound::Excluded(full_key);
429                }
430            }
431            self.current = batch.into_iter();
432            Ok(())
433        }
434    }
435
436    impl<R: RangeKv> Iter<R> {
437        #[allow(clippy::type_complexity)]
438        pub fn next(&mut self) -> StorageResult<Option<(BytesFullKey, Option<Bytes>)>> {
439            match self.current.next() {
440                Some((key, value)) => Ok(Some((key, value))),
441                None => {
442                    self.refill()?;
443                    Ok(self.current.next())
444                }
445            }
446        }
447    }
448
449    #[cfg(test)]
450    mod tests {
451        use rand::Rng;
452
453        use super::*;
454        use crate::memory::sled::SledRangeKv;
455
456        #[test]
457        fn test_btreemap_iter_chaos() {
458            let map = Arc::new(RwLock::new(BTreeMap::new()));
459            test_iter_chaos_inner(map, 1000);
460        }
461
462        #[cfg(not(madsim))]
463        #[test]
464        fn test_sled_iter_chaos() {
465            let map = SledRangeKv::new_temp();
466            test_iter_chaos_inner(map, 100);
467        }
468
469        fn test_iter_chaos_inner(map: impl RangeKv, count: usize) {
470            let key_range = 1..=10000;
471            let num_to_bytes = |k: i32| Bytes::from(format!("{:06}", k).as_bytes().to_vec());
472            let num_to_full_key =
473                |k: i32| FullKey::new(TableId::default(), TableKey(num_to_bytes(k)), 0);
474            #[allow(clippy::mutable_key_type)]
475            map.ingest_batch(key_range.clone().map(|k| {
476                let key = num_to_full_key(k);
477                let b = key.user_key.table_key.0.clone();
478
479                (key, Some(b))
480            }))
481            .unwrap();
482
483            let rand_bound = || {
484                let key = rand::rng().random_range(key_range.clone());
485                let key = num_to_full_key(key);
486                match rand::rng().random_range(1..=5) {
487                    1 | 2 => Bound::Included(key),
488                    3 | 4 => Bound::Excluded(key),
489                    _ => Bound::Unbounded,
490                }
491            };
492
493            for _ in 0..count {
494                let range = loop {
495                    let range = (rand_bound(), rand_bound());
496                    let (start, end) = (range.start_bound(), range.end_bound());
497
498                    // Filter out invalid ranges. Code migrated from `BTreeMap::range`.
499                    match (start, end) {
500                        (Bound::Excluded(s), Bound::Excluded(e)) if s == e => {
501                            continue;
502                        }
503                        (
504                            Bound::Included(s) | Bound::Excluded(s),
505                            Bound::Included(e) | Bound::Excluded(e),
506                        ) if s > e => {
507                            continue;
508                        }
509                        _ => break range,
510                    }
511                };
512
513                let v1 = {
514                    let mut v = vec![];
515                    let mut iter = Iter::new(map.clone(), range.clone(), false);
516                    while let Some((key, value)) = iter.next().unwrap() {
517                        v.push((key, value));
518                    }
519                    v
520                };
521                let v2 = map.range(range, None).unwrap();
522
523                // Items iterated from the batched iterator should be the same as normaliterator.
524                assert_eq!(v1, v2);
525            }
526        }
527    }
528}
529
530pub type MemoryStateStore = RangeKvStateStore<BTreeMapRangeKv>;
531
532struct TableState {
533    init_epoch: u64,
534    next_epochs: BTreeMap<u64, u64>,
535    latest_sealed_epoch: Option<u64>,
536    sealing_epochs: BTreeMap<u64, BitmapBuilder>,
537}
538
539impl TableState {
540    fn new(init_epoch: u64) -> Self {
541        Self {
542            init_epoch,
543            next_epochs: Default::default(),
544            latest_sealed_epoch: None,
545            sealing_epochs: Default::default(),
546        }
547    }
548
549    async fn wait_epoch(
550        tables: &parking_lot::Mutex<HashMap<TableId, Self>>,
551        table_id: TableId,
552        epoch: u64,
553    ) {
554        loop {
555            {
556                let tables = tables.lock();
557                let table_state = tables.get(&table_id).expect("should exist");
558                assert!(epoch >= table_state.init_epoch);
559                if epoch == table_state.init_epoch {
560                    return;
561                }
562                if let Some(latest_sealed_epoch) = table_state.latest_sealed_epoch
563                    && latest_sealed_epoch >= epoch
564                {
565                    return;
566                }
567            }
568            yield_now().await;
569        }
570    }
571}
572
573type InMemVectorStore = Arc<RwLock<HashMap<TableId, Vec<(Vector, Bytes, u64)>>>>;
574
575/// An in-memory state store
576///
577/// The in-memory state store is a [`BTreeMap`], which maps [`FullKey`] to value. It
578/// never does GC, so the memory usage will be high. Therefore, in-memory state store should never
579/// be used in production.
580#[derive(Clone, Default)]
581pub struct RangeKvStateStore<R: RangeKv> {
582    /// Stores (key, epoch) -> user value.
583    inner: R,
584    /// `table_id` -> `prev_epoch` -> `curr_epoch`
585    tables: Arc<parking_lot::Mutex<HashMap<TableId, TableState>>>,
586
587    vectors: InMemVectorStore,
588}
589
590fn to_full_key_range<R, B>(table_id: TableId, table_key_range: R) -> BytesFullKeyRange
591where
592    R: RangeBounds<B> + Send,
593    B: AsRef<[u8]>,
594{
595    let start = match table_key_range.start_bound() {
596        Included(k) => Included(FullKey::new(
597            table_id,
598            TableKey(Bytes::from(k.as_ref().to_vec())),
599            HummockEpoch::MAX,
600        )),
601        Excluded(k) => Excluded(FullKey::new(
602            table_id,
603            TableKey(Bytes::from(k.as_ref().to_vec())),
604            0,
605        )),
606        Unbounded => Included(FullKey::new(
607            table_id,
608            TableKey(Bytes::from(b"".to_vec())),
609            HummockEpoch::MAX,
610        )),
611    };
612    let end = match table_key_range.end_bound() {
613        Included(k) => Included(FullKey::new(
614            table_id,
615            TableKey(Bytes::from(k.as_ref().to_vec())),
616            0,
617        )),
618        Excluded(k) => Excluded(FullKey::new(
619            table_id,
620            TableKey(Bytes::from(k.as_ref().to_vec())),
621            HummockEpoch::MAX,
622        )),
623        Unbounded => {
624            if let Some(next_table_id) = table_id.as_raw_id().checked_add(1) {
625                Excluded(FullKey::new(
626                    next_table_id.into(),
627                    TableKey(Bytes::from(b"".to_vec())),
628                    HummockEpoch::MAX,
629                ))
630            } else {
631                Unbounded
632            }
633        }
634    };
635    (start, end)
636}
637
638impl MemoryStateStore {
639    pub fn new() -> Self {
640        Self::default()
641    }
642
643    pub fn shared() -> Self {
644        static STORE: LazyLock<MemoryStateStore> = LazyLock::new(MemoryStateStore::new);
645        STORE.clone()
646    }
647}
648
649impl<R: RangeKv> RangeKvStateStore<R> {
650    fn scan(
651        &self,
652        key_range: TableKeyRange,
653        epoch: u64,
654        table_id: TableId,
655        limit: Option<usize>,
656    ) -> StorageResult<Vec<(Bytes, Bytes)>> {
657        let mut data = vec![];
658        if limit == Some(0) {
659            return Ok(vec![]);
660        }
661        let mut last_user_key = None;
662        for (key, value) in self
663            .inner
664            .range(to_full_key_range(table_id, key_range), None)?
665        {
666            if key.epoch_with_gap.pure_epoch() > epoch {
667                continue;
668            }
669            if Some(&key.user_key) != last_user_key.as_ref() {
670                if let Some(value) = value {
671                    data.push((Bytes::from(key.encode()), value.clone()));
672                }
673                last_user_key = Some(key.user_key.clone());
674            }
675            if let Some(limit) = limit
676                && data.len() >= limit
677            {
678                break;
679            }
680        }
681        Ok(data)
682    }
683}
684
685#[derive(Clone)]
686pub struct RangeKvStateStoreReadSnapshot<R: RangeKv> {
687    inner: RangeKvStateStore<R>,
688    epoch: u64,
689    table_id: TableId,
690}
691
692impl<R: RangeKv> StateStoreGet for RangeKvStateStoreReadSnapshot<R> {
693    async fn on_key_value<'a, O: Send + 'a>(
694        &'a self,
695        key: TableKey<Bytes>,
696        _read_options: ReadOptions,
697        on_key_value_fn: impl KeyValueFn<'a, O>,
698    ) -> StorageResult<Option<O>> {
699        self.inner
700            .get_keyed_row_impl(key, self.epoch, self.table_id)
701            .and_then(|option| {
702                if let Some((key, value)) = option {
703                    on_key_value_fn(key.to_ref(), value.as_ref()).map(Some)
704                } else {
705                    Ok(None)
706                }
707            })
708    }
709}
710
711impl<R: RangeKv> StateStoreRead for RangeKvStateStoreReadSnapshot<R> {
712    type Iter = RangeKvStateStoreIter<R>;
713    type RevIter = RangeKvStateStoreRevIter<R>;
714
715    async fn iter(
716        &self,
717        key_range: TableKeyRange,
718        _read_options: ReadOptions,
719    ) -> StorageResult<Self::Iter> {
720        self.inner.iter_impl(key_range, self.epoch, self.table_id)
721    }
722
723    async fn rev_iter(
724        &self,
725        key_range: TableKeyRange,
726        _read_options: ReadOptions,
727    ) -> StorageResult<Self::RevIter> {
728        self.inner
729            .rev_iter_impl(key_range, self.epoch, self.table_id)
730    }
731}
732
733impl<R: RangeKv> StateStoreReadVector for RangeKvStateStoreReadSnapshot<R> {
734    async fn nearest<'a, O: Send + 'a>(
735        &'a self,
736        vec: VectorRef<'a>,
737        options: VectorNearestOptions,
738        on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
739    ) -> StorageResult<Vec<O>> {
740        fn nearest_impl<'a, M: MeasureDistanceBuilder, O>(
741            store: &'a InMemVectorStore,
742            epoch: u64,
743            table_id: TableId,
744            vec: VectorRef<'a>,
745            options: VectorNearestOptions,
746            on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
747        ) -> Vec<O> {
748            let mut builder = NearestBuilder::<'_, O, M>::new(vec, options.top_n);
749            builder.add(
750                store
751                    .read()
752                    .get(&table_id)
753                    .map(|vec| vec.iter())
754                    .into_iter()
755                    .flatten()
756                    .filter(|(_, _, vector_epoch)| epoch >= *vector_epoch)
757                    .map(|(vec, info, _)| (vec.to_ref(), info.as_ref())),
758                on_nearest_item_fn,
759            );
760            builder.finish()
761        }
762        dispatch_distance_measurement!(options.measure, MeasurementType, {
763            Ok(nearest_impl::<MeasurementType, O>(
764                &self.inner.vectors,
765                self.epoch,
766                self.table_id,
767                vec,
768                options,
769                on_nearest_item_fn,
770            ))
771        })
772    }
773}
774
775impl<R: RangeKv> RangeKvStateStore<R> {
776    fn get_keyed_row_impl(
777        &self,
778        key: TableKey<Bytes>,
779        epoch: u64,
780        table_id: TableId,
781    ) -> StorageResult<Option<StateStoreKeyedRow>> {
782        let range_bounds = (Bound::Included(key.clone()), Bound::Included(key));
783        // We do not really care about vnodes here, so we just use the default value.
784        let res = self.scan(range_bounds, epoch, table_id, Some(1))?;
785
786        Ok(match res.as_slice() {
787            [] => None,
788            [(key, value)] => Some((
789                FullKey::decode(key.as_ref()).to_vec().into_bytes(),
790                value.clone(),
791            )),
792            _ => unreachable!(),
793        })
794    }
795
796    fn iter_impl(
797        &self,
798        key_range: TableKeyRange,
799        epoch: u64,
800        table_id: TableId,
801    ) -> StorageResult<RangeKvStateStoreIter<R>> {
802        Ok(RangeKvStateStoreIter::new(
803            batched_iter::Iter::new(
804                self.inner.clone(),
805                to_full_key_range(table_id, key_range),
806                false,
807            ),
808            epoch,
809            true,
810        ))
811    }
812
813    fn rev_iter_impl(
814        &self,
815        key_range: TableKeyRange,
816        epoch: u64,
817        table_id: TableId,
818    ) -> StorageResult<RangeKvStateStoreRevIter<R>> {
819        Ok(RangeKvStateStoreRevIter::new(
820            batched_iter::Iter::new(
821                self.inner.clone(),
822                to_full_key_range(table_id, key_range),
823                true,
824            ),
825            epoch,
826            true,
827        ))
828    }
829}
830
831impl<R: RangeKv> StateStoreReadLog for RangeKvStateStore<R> {
832    type ChangeLogIter = RangeKvStateStoreChangeLogIter<R>;
833
834    async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
835        loop {
836            {
837                let tables = self.tables.lock();
838                let Some(tables) = tables.get(&options.table_id) else {
839                    return Err(HummockError::next_epoch(format!(
840                        "table {} not exist",
841                        options.table_id
842                    ))
843                    .into());
844                };
845                if let Some(next_epoch) = tables.next_epochs.get(&epoch) {
846                    break Ok(*next_epoch);
847                }
848            }
849            yield_now().await;
850        }
851    }
852
853    async fn iter_log(
854        &self,
855        (min_epoch, max_epoch): (u64, u64),
856        key_range: TableKeyRange,
857        options: ReadLogOptions,
858    ) -> StorageResult<Self::ChangeLogIter> {
859        let new_value_iter = RangeKvStateStoreIter::new(
860            batched_iter::Iter::new(
861                self.inner.clone(),
862                to_full_key_range(options.table_id, key_range.clone()),
863                false,
864            ),
865            max_epoch,
866            true,
867        );
868        let old_value_iter = RangeKvStateStoreIter::new(
869            batched_iter::Iter::new(
870                self.inner.clone(),
871                to_full_key_range(options.table_id, key_range),
872                false,
873            ),
874            min_epoch,
875            false,
876        );
877        RangeKvStateStoreChangeLogIter::new(new_value_iter, old_value_iter)
878    }
879}
880
881impl<R: RangeKv> RangeKvStateStore<R> {
882    fn new_read_snapshot_impl(
883        &self,
884        epoch: u64,
885        table_id: TableId,
886    ) -> RangeKvStateStoreReadSnapshot<R> {
887        RangeKvStateStoreReadSnapshot {
888            inner: self.clone(),
889            epoch,
890            table_id,
891        }
892    }
893
894    pub(crate) fn ingest_batch(
895        &self,
896        mut kv_pairs: Vec<(TableKey<Bytes>, StorageValue)>,
897        delete_ranges: Vec<(Bound<Bytes>, Bound<Bytes>)>,
898        epoch: u64,
899        table_id: TableId,
900    ) -> StorageResult<usize> {
901        let mut delete_keys = BTreeSet::new();
902        for del_range in delete_ranges {
903            for (key, _) in self.inner.range(
904                (
905                    del_range
906                        .0
907                        .map(|table_key| FullKey::new(table_id, TableKey(table_key), epoch)),
908                    del_range
909                        .1
910                        .map(|table_key| FullKey::new(table_id, TableKey(table_key), epoch)),
911                ),
912                None,
913            )? {
914                delete_keys.insert(key.user_key.table_key);
915            }
916        }
917        for key in delete_keys {
918            kv_pairs.push((key, StorageValue::new_delete()));
919        }
920
921        let mut size = 0;
922        self.inner
923            .ingest_batch(kv_pairs.into_iter().map(|(key, value)| {
924                size += key.len() + value.size();
925                (FullKey::new(table_id, key, epoch), value.user_value)
926            }))?;
927        Ok(size)
928    }
929
930    fn ingest_vectors(&self, table_id: TableId, epoch: u64, vecs: Vec<(Vector, Bytes)>) {
931        self.vectors
932            .write()
933            .entry(table_id)
934            .or_default()
935            .extend(vecs.into_iter().map(|(vec, info)| (vec, info, epoch)));
936    }
937}
938
939impl<R: RangeKv> StateStore for RangeKvStateStore<R> {
940    type Local = RangeKvLocalStateStore<R>;
941    type ReadSnapshot = RangeKvStateStoreReadSnapshot<R>;
942    type VectorWriter = RangeKvLocalStateStore<R>;
943
944    async fn try_wait_epoch(
945        &self,
946        _epoch: HummockReadEpoch,
947        _options: TryWaitEpochOptions,
948    ) -> StorageResult<()> {
949        // memory backend doesn't need to wait for epoch, so this is a no-op.
950        Ok(())
951    }
952
953    async fn new_local(&self, option: NewLocalOptions) -> Self::Local {
954        RangeKvLocalStateStore::new(self.clone(), option)
955    }
956
957    async fn new_read_snapshot(
958        &self,
959        epoch: HummockReadEpoch,
960        options: NewReadSnapshotOptions,
961    ) -> StorageResult<Self::ReadSnapshot> {
962        Ok(self.new_read_snapshot_impl(epoch.get_epoch(), options.table_id))
963    }
964
965    async fn new_vector_writer(&self, options: NewVectorWriterOptions) -> Self::VectorWriter {
966        RangeKvLocalStateStore::new(
967            self.clone(),
968            NewLocalOptions {
969                table_id: options.table_id,
970                fragment_id: FragmentId::default(),
971                op_consistency_level: Default::default(),
972                table_option: Default::default(),
973                is_replicated: false,
974                vnodes: Arc::new(Bitmap::from_bool_slice(&[true])),
975                upload_on_flush: true,
976            },
977        )
978    }
979}
980
981pub struct RangeKvLocalStateStore<R: RangeKv> {
982    mem_table: MemTable,
983    vectors: Vec<(Vector, Bytes)>,
984    inner: RangeKvStateStore<R>,
985
986    epoch: Option<EpochPair>,
987
988    table_id: TableId,
989    op_consistency_level: OpConsistencyLevel,
990    vnodes: Arc<Bitmap>,
991}
992
993impl<R: RangeKv> RangeKvLocalStateStore<R> {
994    pub fn new(inner: RangeKvStateStore<R>, option: NewLocalOptions) -> Self {
995        Self {
996            inner,
997            mem_table: MemTable::new(option.table_id, option.op_consistency_level.clone()),
998            epoch: None,
999            table_id: option.table_id,
1000            op_consistency_level: option.op_consistency_level,
1001            vnodes: option.vnodes,
1002            vectors: vec![],
1003        }
1004    }
1005
1006    fn epoch(&self) -> u64 {
1007        self.epoch.expect("should have set the epoch").curr
1008    }
1009}
1010
1011impl<R: RangeKv> StateStoreGet for RangeKvLocalStateStore<R> {
1012    async fn on_key_value<'a, O: Send + 'a>(
1013        &'a self,
1014        key: TableKey<Bytes>,
1015        _read_options: ReadOptions,
1016        on_key_value_fn: impl KeyValueFn<'a, O>,
1017    ) -> StorageResult<Option<O>> {
1018        if let Some((key, value)) = match self.mem_table.buffer.get(&key) {
1019            None => self
1020                .inner
1021                .get_keyed_row_impl(key, self.epoch(), self.table_id)?,
1022            Some(op) => match op {
1023                KeyOp::Insert(value) | KeyOp::Update((_, value)) => Some((
1024                    FullKey::new(self.table_id, key, self.epoch()),
1025                    value.clone(),
1026                )),
1027                KeyOp::Delete(_) => None,
1028            },
1029        } {
1030            Ok(Some(on_key_value_fn(key.to_ref(), value.as_ref())?))
1031        } else {
1032            Ok(None)
1033        }
1034    }
1035}
1036
1037impl<R: RangeKv> LocalStateStore for RangeKvLocalStateStore<R> {
1038    type FlushedSnapshotReader = RangeKvStateStoreReadSnapshot<R>;
1039
1040    type Iter<'a> = impl StateStoreIter + 'a;
1041    type RevIter<'a> = impl StateStoreIter + 'a;
1042
1043    async fn iter(
1044        &self,
1045        key_range: TableKeyRange,
1046        _read_options: ReadOptions,
1047    ) -> StorageResult<Self::Iter<'_>> {
1048        let iter = self
1049            .inner
1050            .iter_impl(key_range.clone(), self.epoch(), self.table_id)?;
1051        Ok(FromStreamStateStoreIter::new(Box::pin(merge_stream(
1052            self.mem_table.iter(key_range),
1053            iter.into_stream(to_owned_item),
1054            self.table_id,
1055            self.epoch(),
1056            false,
1057        ))))
1058    }
1059
1060    async fn rev_iter(
1061        &self,
1062        key_range: TableKeyRange,
1063        _read_options: ReadOptions,
1064    ) -> StorageResult<Self::RevIter<'_>> {
1065        let iter = self
1066            .inner
1067            .rev_iter_impl(key_range.clone(), self.epoch(), self.table_id)?;
1068        Ok(FromStreamStateStoreIter::new(Box::pin(merge_stream(
1069            self.mem_table.rev_iter(key_range),
1070            iter.into_stream(to_owned_item),
1071            self.table_id,
1072            self.epoch(),
1073            true,
1074        ))))
1075    }
1076
1077    fn insert(
1078        &mut self,
1079        key: TableKey<Bytes>,
1080        new_val: Bytes,
1081        old_val: Option<Bytes>,
1082    ) -> StorageResult<()> {
1083        match old_val {
1084            None => self.mem_table.insert(key, new_val)?,
1085            Some(old_val) => self.mem_table.update(key, old_val, new_val)?,
1086        };
1087        Ok(())
1088    }
1089
1090    fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
1091        Ok(self.mem_table.delete(key, old_val)?)
1092    }
1093
1094    async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
1095        if self.vnodes.len() > 1 {
1096            TableState::wait_epoch(
1097                &self.inner.tables,
1098                self.table_id,
1099                self.epoch.expect("should have init").prev,
1100            )
1101            .await;
1102        }
1103        Ok(std::mem::replace(&mut self.vnodes, vnodes))
1104    }
1105
1106    fn get_table_watermark(&self, _vnode: VirtualNode) -> Option<Bytes> {
1107        // TODO: may store the written table watermark and have a correct implementation
1108        None
1109    }
1110
1111    fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
1112        self.inner.new_read_snapshot_impl(MAX_EPOCH, self.table_id)
1113    }
1114}
1115
1116impl<R: RangeKv> StateStoreWriteEpochControl for RangeKvLocalStateStore<R> {
1117    async fn flush(&mut self) -> StorageResult<usize> {
1118        let buffer = self.mem_table.drain().into_parts();
1119        let mut kv_pairs = Vec::with_capacity(buffer.len());
1120        let sanity_check_read_snapshot = if sanity_check_enabled() {
1121            Some(self.inner.new_read_snapshot_impl(MAX_EPOCH, self.table_id))
1122        } else {
1123            None
1124        };
1125        for (key, key_op) in buffer {
1126            match key_op {
1127                // Currently, some executors do not strictly comply with these semantics. As
1128                // a workaround you may call disable the check by initializing the
1129                // state store with `op_consistency_level=Inconsistent`.
1130                KeyOp::Insert(value) => {
1131                    if let Some(sanity_check_read_snapshot) = &sanity_check_read_snapshot {
1132                        do_insert_sanity_check(
1133                            self.table_id,
1134                            &key,
1135                            &value,
1136                            sanity_check_read_snapshot,
1137                            &self.op_consistency_level,
1138                        )
1139                        .await?;
1140                    }
1141                    kv_pairs.push((key, StorageValue::new_put(value)));
1142                }
1143                KeyOp::Delete(old_value) => {
1144                    if let Some(sanity_check_read_snapshot) = &sanity_check_read_snapshot {
1145                        do_delete_sanity_check(
1146                            self.table_id,
1147                            &key,
1148                            &old_value,
1149                            sanity_check_read_snapshot,
1150                            &self.op_consistency_level,
1151                        )
1152                        .await?;
1153                    }
1154                    kv_pairs.push((key, StorageValue::new_delete()));
1155                }
1156                KeyOp::Update((old_value, new_value)) => {
1157                    if let Some(sanity_check_read_snapshot) = &sanity_check_read_snapshot {
1158                        do_update_sanity_check(
1159                            self.table_id,
1160                            &key,
1161                            &old_value,
1162                            &new_value,
1163                            sanity_check_read_snapshot,
1164                            &self.op_consistency_level,
1165                        )
1166                        .await?;
1167                    }
1168                    kv_pairs.push((key, StorageValue::new_put(new_value)));
1169                }
1170            }
1171        }
1172        let epoch = self.epoch();
1173        self.inner
1174            .ingest_vectors(self.table_id, epoch, take(&mut self.vectors));
1175        self.inner
1176            .ingest_batch(kv_pairs, vec![], epoch, self.table_id)
1177    }
1178
1179    async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
1180        assert_eq!(
1181            self.epoch.replace(options.epoch),
1182            None,
1183            "epoch in local state store of table id {:?} is init for more than once",
1184            self.table_id
1185        );
1186        self.inner
1187            .tables
1188            .lock()
1189            .entry(self.table_id)
1190            .or_insert_with(|| TableState::new(options.epoch.prev))
1191            .next_epochs
1192            .insert(options.epoch.prev, options.epoch.curr);
1193        if self.vnodes.len() > 1 {
1194            TableState::wait_epoch(&self.inner.tables, self.table_id, options.epoch.prev).await;
1195        }
1196
1197        Ok(())
1198    }
1199
1200    fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
1201        assert!(!self.mem_table.is_dirty());
1202        if let Some(value_checker) = opts.switch_op_consistency_level {
1203            self.mem_table.op_consistency_level.update(&value_checker);
1204        }
1205        let epoch = self
1206            .epoch
1207            .as_mut()
1208            .expect("should have init epoch before seal the first epoch");
1209        let prev_epoch = epoch.curr;
1210        epoch.prev = prev_epoch;
1211        epoch.curr = next_epoch;
1212        assert!(
1213            next_epoch > prev_epoch,
1214            "new epoch {} should be greater than current epoch: {}",
1215            next_epoch,
1216            prev_epoch
1217        );
1218
1219        let mut tables = self.inner.tables.lock();
1220        let table_state = tables
1221            .get_mut(&self.table_id)
1222            .expect("should be set when init");
1223
1224        table_state.next_epochs.insert(prev_epoch, next_epoch);
1225        if self.vnodes.len() > 1 {
1226            let sealing_epoch_vnodes = table_state
1227                .sealing_epochs
1228                .entry(prev_epoch)
1229                .or_insert_with(|| BitmapBuilder::zeroed(self.vnodes.len()));
1230            assert_eq!(self.vnodes.len(), sealing_epoch_vnodes.len());
1231            for vnode in self.vnodes.iter_ones() {
1232                assert!(!sealing_epoch_vnodes.is_set(vnode));
1233                sealing_epoch_vnodes.set(vnode, true);
1234            }
1235            if (0..self.vnodes.len()).all(|vnode| sealing_epoch_vnodes.is_set(vnode)) {
1236                let (all_sealed_epoch, _) =
1237                    table_state.sealing_epochs.pop_first().expect("non-empty");
1238                assert_eq!(
1239                    all_sealed_epoch, prev_epoch,
1240                    "new all_sealed_epoch must be the current prev epoch"
1241                );
1242                if let Some(prev_latest_sealed_epoch) =
1243                    table_state.latest_sealed_epoch.replace(prev_epoch)
1244                {
1245                    assert!(prev_epoch > prev_latest_sealed_epoch);
1246                }
1247            }
1248        }
1249
1250        if let Some((direction, watermarks, _watermark_type)) = opts.table_watermarks {
1251            let delete_ranges = watermarks
1252                .iter()
1253                .flat_map(|vnode_watermark| {
1254                    let inner_range = match direction {
1255                        WatermarkDirection::Ascending => {
1256                            (Unbounded, Excluded(vnode_watermark.watermark().clone()))
1257                        }
1258                        WatermarkDirection::Descending => {
1259                            (Excluded(vnode_watermark.watermark().clone()), Unbounded)
1260                        }
1261                    };
1262                    vnode_watermark
1263                        .vnode_bitmap()
1264                        .iter_vnodes()
1265                        .map(move |vnode| {
1266                            let (start, end) =
1267                                prefixed_range_with_vnode(inner_range.clone(), vnode);
1268                            (start.map(|key| key.0), end.map(|key| key.0))
1269                        })
1270                })
1271                .collect_vec();
1272            if let Err(e) =
1273                self.inner
1274                    .ingest_batch(Vec::new(), delete_ranges, self.epoch(), self.table_id)
1275            {
1276                error!(error = %e.as_report(), "failed to write delete ranges of table watermark");
1277            }
1278        }
1279    }
1280
1281    async fn try_flush(&mut self) -> StorageResult<()> {
1282        Ok(())
1283    }
1284}
1285
1286impl<R: RangeKv> StateStoreWriteVector for RangeKvLocalStateStore<R> {
1287    fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> StorageResult<()> {
1288        self.vectors.push((vec.to_owned_scalar(), info));
1289        Ok(())
1290    }
1291}
1292
1293pub struct RangeKvStateStoreIter<R: RangeKv> {
1294    inner: batched_iter::Iter<R>,
1295
1296    epoch: HummockEpoch,
1297    is_inclusive_epoch: bool,
1298
1299    last_key: Option<UserKey<Bytes>>,
1300
1301    item_buffer: Option<StateStoreKeyedRow>,
1302}
1303
1304impl<R: RangeKv> RangeKvStateStoreIter<R> {
1305    pub fn new(
1306        inner: batched_iter::Iter<R>,
1307        epoch: HummockEpoch,
1308        is_inclusive_epoch: bool,
1309    ) -> Self {
1310        Self {
1311            inner,
1312            epoch,
1313            is_inclusive_epoch,
1314            last_key: None,
1315            item_buffer: None,
1316        }
1317    }
1318}
1319
1320impl<R: RangeKv> StateStoreIter for RangeKvStateStoreIter<R> {
1321    async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
1322        self.next_inner()?;
1323        Ok(self
1324            .item_buffer
1325            .as_ref()
1326            .map(|(key, value)| (key.to_ref(), value.as_ref())))
1327    }
1328}
1329
1330impl<R: RangeKv> RangeKvStateStoreIter<R> {
1331    fn next_inner(&mut self) -> StorageResult<()> {
1332        self.item_buffer = None;
1333        while let Some((key, value)) = self.inner.next()? {
1334            let epoch = key.epoch_with_gap.pure_epoch();
1335            if epoch > self.epoch {
1336                continue;
1337            }
1338            if epoch == self.epoch && !self.is_inclusive_epoch {
1339                continue;
1340            }
1341            if Some(key.user_key.as_ref()) != self.last_key.as_ref().map(|key| key.as_ref()) {
1342                self.last_key = Some(key.user_key.clone());
1343                if let Some(value) = value {
1344                    self.item_buffer = Some((key, value));
1345                    break;
1346                }
1347            }
1348        }
1349        Ok(())
1350    }
1351}
1352
1353pub struct RangeKvStateStoreRevIter<R: RangeKv> {
1354    inner: batched_iter::Iter<R>,
1355
1356    epoch: HummockEpoch,
1357    is_inclusive_epoch: bool,
1358
1359    item_buffer: VecDeque<StateStoreKeyedRow>,
1360}
1361
1362impl<R: RangeKv> RangeKvStateStoreRevIter<R> {
1363    pub fn new(
1364        inner: batched_iter::Iter<R>,
1365        epoch: HummockEpoch,
1366        is_inclusive_epoch: bool,
1367    ) -> Self {
1368        Self {
1369            inner,
1370            epoch,
1371            is_inclusive_epoch,
1372            item_buffer: VecDeque::default(),
1373        }
1374    }
1375}
1376
1377impl<R: RangeKv> StateStoreIter for RangeKvStateStoreRevIter<R> {
1378    async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
1379        self.next_inner()?;
1380        Ok(self
1381            .item_buffer
1382            .back()
1383            .map(|(key, value)| (key.to_ref(), value.as_ref())))
1384    }
1385}
1386
1387impl<R: RangeKv> RangeKvStateStoreRevIter<R> {
1388    fn next_inner(&mut self) -> StorageResult<()> {
1389        self.item_buffer.pop_back();
1390        while let Some((key, value)) = self.inner.next()? {
1391            let epoch = key.epoch_with_gap.pure_epoch();
1392            if epoch > self.epoch {
1393                continue;
1394            }
1395            if epoch == self.epoch && !self.is_inclusive_epoch {
1396                continue;
1397            }
1398
1399            let v = match value {
1400                Some(v) => v,
1401                None => {
1402                    if let Some(last_key) = self.item_buffer.front()
1403                        && key.user_key.as_ref() == last_key.0.user_key.as_ref()
1404                    {
1405                        self.item_buffer.clear();
1406                    }
1407                    continue;
1408                }
1409            };
1410
1411            if let Some(last_key) = self.item_buffer.front() {
1412                if key.user_key.as_ref() != last_key.0.user_key.as_ref() {
1413                    self.item_buffer.push_front((key, v));
1414                    break;
1415                } else {
1416                    self.item_buffer.pop_front();
1417                    self.item_buffer.push_front((key, v));
1418                }
1419            } else {
1420                self.item_buffer.push_front((key, v));
1421            }
1422        }
1423        Ok(())
1424    }
1425}
1426
1427pub struct RangeKvStateStoreChangeLogIter<R: RangeKv> {
1428    new_value_iter: RangeKvStateStoreIter<R>,
1429    old_value_iter: RangeKvStateStoreIter<R>,
1430    item_buffer: Option<(TableKey<Bytes>, ChangeLogValue<Bytes>)>,
1431}
1432
1433impl<R: RangeKv> RangeKvStateStoreChangeLogIter<R> {
1434    fn new(
1435        mut new_value_iter: RangeKvStateStoreIter<R>,
1436        mut old_value_iter: RangeKvStateStoreIter<R>,
1437    ) -> StorageResult<Self> {
1438        new_value_iter.next_inner()?;
1439        old_value_iter.next_inner()?;
1440        Ok(Self {
1441            new_value_iter,
1442            old_value_iter,
1443            item_buffer: None,
1444        })
1445    }
1446}
1447
1448impl<R: RangeKv> StateStoreIter<StateStoreReadLogItem> for RangeKvStateStoreChangeLogIter<R> {
1449    async fn try_next(&mut self) -> StorageResult<Option<StateStoreReadLogItemRef<'_>>> {
1450        loop {
1451            match (
1452                &self.new_value_iter.item_buffer,
1453                &self.old_value_iter.item_buffer,
1454            ) {
1455                (None, None) => {
1456                    self.item_buffer = None;
1457                    break;
1458                }
1459                (Some((key, new_value)), None) => {
1460                    self.item_buffer = Some((
1461                        key.user_key.table_key.clone(),
1462                        ChangeLogValue::Insert(new_value.clone()),
1463                    ));
1464                    self.new_value_iter.next_inner()?;
1465                }
1466                (None, Some((key, old_value))) => {
1467                    self.item_buffer = Some((
1468                        key.user_key.table_key.clone(),
1469                        ChangeLogValue::Delete(old_value.clone()),
1470                    ));
1471                    self.old_value_iter.next_inner()?;
1472                }
1473                (Some((new_value_key, new_value)), Some((old_value_key, old_value))) => {
1474                    match new_value_key.user_key.cmp(&old_value_key.user_key) {
1475                        Ordering::Less => {
1476                            self.item_buffer = Some((
1477                                new_value_key.user_key.table_key.clone(),
1478                                ChangeLogValue::Insert(new_value.clone()),
1479                            ));
1480                            self.new_value_iter.next_inner()?;
1481                        }
1482                        Ordering::Greater => {
1483                            self.item_buffer = Some((
1484                                old_value_key.user_key.table_key.clone(),
1485                                ChangeLogValue::Delete(old_value.clone()),
1486                            ));
1487                            self.old_value_iter.next_inner()?;
1488                        }
1489                        Ordering::Equal => {
1490                            if new_value == old_value {
1491                                self.new_value_iter.next_inner()?;
1492                                self.old_value_iter.next_inner()?;
1493                                continue;
1494                            }
1495                            self.item_buffer = Some((
1496                                new_value_key.user_key.table_key.clone(),
1497                                ChangeLogValue::Update {
1498                                    new_value: new_value.clone(),
1499                                    old_value: old_value.clone(),
1500                                },
1501                            ));
1502                            self.new_value_iter.next_inner()?;
1503                            self.old_value_iter.next_inner()?;
1504                        }
1505                    }
1506                }
1507            };
1508            break;
1509        }
1510        Ok(self
1511            .item_buffer
1512            .as_ref()
1513            .map(|(key, value)| (key.to_ref(), value.to_ref())))
1514    }
1515}
1516
1517#[cfg(test)]
1518mod tests {
1519    use risingwave_common::util::epoch::test_epoch;
1520
1521    use super::*;
1522    use crate::hummock::iterator::test_utils::{
1523        iterator_test_table_key_of, iterator_test_value_of,
1524    };
1525    use crate::hummock::test_utils::{ReadOptions, *};
1526    use crate::memory::sled::SledStateStore;
1527
1528    #[tokio::test]
1529    async fn test_snapshot_isolation_memory() {
1530        let state_store = MemoryStateStore::new();
1531        test_snapshot_isolation_inner(state_store).await;
1532    }
1533
1534    #[cfg(not(madsim))]
1535    #[tokio::test]
1536    async fn test_snapshot_isolation_sled() {
1537        let state_store = SledStateStore::new_temp();
1538        test_snapshot_isolation_inner(state_store).await;
1539    }
1540
1541    async fn test_snapshot_isolation_inner(state_store: RangeKvStateStore<impl RangeKv>) {
1542        state_store
1543            .ingest_batch(
1544                vec![
1545                    (
1546                        TableKey(Bytes::from(b"a".to_vec())),
1547                        StorageValue::new_put(b"v1".to_vec()),
1548                    ),
1549                    (
1550                        TableKey(Bytes::from(b"b".to_vec())),
1551                        StorageValue::new_put(b"v1".to_vec()),
1552                    ),
1553                ],
1554                vec![],
1555                0,
1556                Default::default(),
1557            )
1558            .unwrap();
1559        state_store
1560            .ingest_batch(
1561                vec![
1562                    (
1563                        TableKey(Bytes::from(b"a".to_vec())),
1564                        StorageValue::new_put(b"v2".to_vec()),
1565                    ),
1566                    (
1567                        TableKey(Bytes::from(b"b".to_vec())),
1568                        StorageValue::new_delete(),
1569                    ),
1570                ],
1571                vec![],
1572                test_epoch(1),
1573                Default::default(),
1574            )
1575            .unwrap();
1576        assert_eq!(
1577            state_store
1578                .scan(
1579                    (
1580                        Bound::Included(TableKey(Bytes::from("a"))),
1581                        Bound::Included(TableKey(Bytes::from("b"))),
1582                    ),
1583                    0,
1584                    TableId::default(),
1585                    None,
1586                )
1587                .unwrap(),
1588            vec![
1589                (
1590                    FullKey::for_test(Default::default(), Bytes::from("a"), 0)
1591                        .encode()
1592                        .into(),
1593                    b"v1".to_vec().into()
1594                ),
1595                (
1596                    FullKey::for_test(Default::default(), Bytes::from("b"), 0)
1597                        .encode()
1598                        .into(),
1599                    b"v1".to_vec().into()
1600                )
1601            ]
1602        );
1603        assert_eq!(
1604            state_store
1605                .scan(
1606                    (
1607                        Bound::Included(TableKey(Bytes::from("a"))),
1608                        Bound::Included(TableKey(Bytes::from("b"))),
1609                    ),
1610                    0,
1611                    TableId::default(),
1612                    Some(1),
1613                )
1614                .unwrap(),
1615            vec![(
1616                FullKey::for_test(Default::default(), b"a".to_vec(), 0)
1617                    .encode()
1618                    .into(),
1619                b"v1".to_vec().into()
1620            )]
1621        );
1622        assert_eq!(
1623            state_store
1624                .scan(
1625                    (
1626                        Bound::Included(TableKey(Bytes::from("a"))),
1627                        Bound::Included(TableKey(Bytes::from("b"))),
1628                    ),
1629                    test_epoch(1),
1630                    TableId::default(),
1631                    None,
1632                )
1633                .unwrap(),
1634            vec![(
1635                FullKey::for_test(Default::default(), b"a".to_vec(), test_epoch(1))
1636                    .encode()
1637                    .into(),
1638                b"v2".to_vec().into()
1639            )]
1640        );
1641        assert_eq!(
1642            state_store
1643                .get(TableKey(Bytes::from("a")), 0, ReadOptions::default())
1644                .await
1645                .unwrap(),
1646            Some(Bytes::from("v1"))
1647        );
1648        assert_eq!(
1649            state_store
1650                .get(
1651                    TableKey(Bytes::copy_from_slice(b"b")),
1652                    0,
1653                    ReadOptions::default(),
1654                )
1655                .await
1656                .unwrap(),
1657            Some(b"v1".to_vec().into())
1658        );
1659        assert_eq!(
1660            state_store
1661                .get(
1662                    TableKey(Bytes::copy_from_slice(b"c")),
1663                    0,
1664                    ReadOptions::default(),
1665                )
1666                .await
1667                .unwrap(),
1668            None
1669        );
1670        assert_eq!(
1671            state_store
1672                .get(
1673                    TableKey(Bytes::copy_from_slice(b"a")),
1674                    test_epoch(1),
1675                    ReadOptions::default(),
1676                )
1677                .await
1678                .unwrap(),
1679            Some(b"v2".to_vec().into())
1680        );
1681        assert_eq!(
1682            state_store
1683                .get(
1684                    TableKey(Bytes::from("b")),
1685                    test_epoch(1),
1686                    ReadOptions::default(),
1687                )
1688                .await
1689                .unwrap(),
1690            None
1691        );
1692        assert_eq!(
1693            state_store
1694                .get(
1695                    TableKey(Bytes::from("c")),
1696                    test_epoch(1),
1697                    ReadOptions::default()
1698                )
1699                .await
1700                .unwrap(),
1701            None
1702        );
1703    }
1704
1705    #[tokio::test]
1706    async fn test_iter_log_memory() {
1707        let state_store = MemoryStateStore::new();
1708        test_iter_log_inner(state_store).await;
1709    }
1710
1711    #[cfg(not(madsim))]
1712    #[tokio::test]
1713    async fn test_iter_log_sled() {
1714        let state_store = SledStateStore::new_temp();
1715        test_iter_log_inner(state_store).await;
1716    }
1717
1718    async fn test_iter_log_inner(state_store: RangeKvStateStore<impl RangeKv>) {
1719        let table_id = TableId::new(233);
1720        let epoch1 = test_epoch(1);
1721        let key_idx = [1, 2, 4];
1722        let make_key = |i| TableKey(Bytes::from(iterator_test_table_key_of(i)));
1723        let make_value = |i| Bytes::from(iterator_test_value_of(i));
1724        state_store
1725            .ingest_batch(
1726                key_idx
1727                    .iter()
1728                    .map(|i| (make_key(*i), StorageValue::new_put(make_value(*i))))
1729                    .collect(),
1730                vec![],
1731                epoch1,
1732                table_id,
1733            )
1734            .unwrap();
1735        {
1736            let mut iter = state_store
1737                .iter_log(
1738                    (epoch1, epoch1),
1739                    (Unbounded, Unbounded),
1740                    ReadLogOptions { table_id },
1741                )
1742                .await
1743                .unwrap();
1744            for i in key_idx {
1745                let (iter_key, change_value) = iter.try_next().await.unwrap().unwrap();
1746                assert_eq!(make_key(i).to_ref(), iter_key);
1747                assert_eq!(change_value, ChangeLogValue::Insert(make_value(i).as_ref()));
1748            }
1749            assert!(iter.try_next().await.unwrap().is_none());
1750        }
1751
1752        let epoch2 = test_epoch(2);
1753        state_store
1754            .ingest_batch(
1755                vec![
1756                    (make_key(1), StorageValue::new_put(make_value(12))), // update
1757                    (make_key(2), StorageValue::new_delete()),            // delete
1758                    (make_key(3), StorageValue::new_put(make_value(3))),
1759                ],
1760                vec![],
1761                epoch2,
1762                table_id,
1763            )
1764            .unwrap();
1765
1766        // check iter log between two epoch
1767        {
1768            let expected = vec![
1769                (
1770                    make_key(1),
1771                    ChangeLogValue::Update {
1772                        new_value: make_value(12),
1773                        old_value: make_value(1),
1774                    },
1775                ),
1776                (make_key(2), ChangeLogValue::Delete(make_value(2))),
1777                (make_key(3), ChangeLogValue::Insert(make_value(3))),
1778            ];
1779            let mut iter = state_store
1780                .iter_log(
1781                    (epoch2, epoch2),
1782                    (Unbounded, Unbounded),
1783                    ReadLogOptions { table_id },
1784                )
1785                .await
1786                .unwrap();
1787            for (key, change_log_value) in expected {
1788                let (iter_key, iter_value) = iter.try_next().await.unwrap().unwrap();
1789                assert_eq!(
1790                    key.to_ref(),
1791                    iter_key,
1792                    "{:?} {:?}",
1793                    change_log_value.to_ref(),
1794                    iter_value
1795                );
1796                assert_eq!(change_log_value.to_ref(), iter_value);
1797            }
1798            assert!(iter.try_next().await.unwrap().is_none());
1799        }
1800        // check iter log on the original old epoch
1801        {
1802            let mut iter = state_store
1803                .iter_log(
1804                    (epoch1, epoch1),
1805                    (Unbounded, Unbounded),
1806                    ReadLogOptions { table_id },
1807                )
1808                .await
1809                .unwrap();
1810            for i in key_idx {
1811                let (iter_key, change_value) = iter.try_next().await.unwrap().unwrap();
1812                assert_eq!(make_key(i).to_ref(), iter_key);
1813                assert_eq!(change_value, ChangeLogValue::Insert(make_value(i).as_ref()));
1814            }
1815            assert!(iter.try_next().await.unwrap().is_none());
1816        }
1817        // check iter on merging the two epochs
1818        {
1819            let mut iter = state_store
1820                .iter_log(
1821                    (epoch1, epoch2),
1822                    (Unbounded, Unbounded),
1823                    ReadLogOptions { table_id },
1824                )
1825                .await
1826                .unwrap();
1827            let (iter_key, change_value) = iter.try_next().await.unwrap().unwrap();
1828            assert_eq!(make_key(1).to_ref(), iter_key);
1829            assert_eq!(
1830                change_value,
1831                ChangeLogValue::Insert(make_value(12).as_ref())
1832            );
1833            for i in [3, 4] {
1834                let (iter_key, change_value) = iter.try_next().await.unwrap().unwrap();
1835                assert_eq!(make_key(i).to_ref(), iter_key);
1836                assert_eq!(change_value, ChangeLogValue::Insert(make_value(i).as_ref()));
1837            }
1838            assert!(iter.try_next().await.unwrap().is_none());
1839        }
1840    }
1841}