risingwave_storage/
memory.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
17use std::mem::take;
18use std::ops::Bound::{Excluded, Included, Unbounded};
19use std::ops::{Bound, RangeBounds};
20use std::sync::{Arc, LazyLock};
21
22use bytes::Bytes;
23use itertools::Itertools;
24use parking_lot::RwLock;
25use risingwave_common::array::VectorRef;
26use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
27use risingwave_common::catalog::{TableId, TableOption};
28use risingwave_common::dispatch_distance_measurement;
29use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
30use risingwave_common::types::ScalarRef;
31use risingwave_common::util::epoch::{EpochPair, MAX_EPOCH};
32use risingwave_hummock_sdk::key::{
33    FullKey, TableKey, TableKeyRange, UserKey, prefixed_range_with_vnode,
34};
35use risingwave_hummock_sdk::table_watermark::WatermarkDirection;
36use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch};
37use thiserror_ext::AsReport;
38use tokio::task::yield_now;
39use tracing::error;
40
41use crate::error::StorageResult;
42use crate::hummock::HummockError;
43use crate::hummock::utils::{
44    do_delete_sanity_check, do_insert_sanity_check, do_update_sanity_check, merge_stream,
45    sanity_check_enabled,
46};
47use crate::mem_table::{KeyOp, MemTable};
48use crate::storage_value::StorageValue;
49use crate::store::*;
50use crate::vector::{MeasureDistanceBuilder, NearestBuilder};
51
52pub type BytesFullKey = FullKey<Bytes>;
53pub type BytesFullKeyRange = (Bound<BytesFullKey>, Bound<BytesFullKey>);
54
55#[allow(clippy::type_complexity)]
56pub trait RangeKv: Clone + Send + Sync + 'static {
57    fn range(
58        &self,
59        range: BytesFullKeyRange,
60        limit: Option<usize>,
61    ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>>;
62
63    fn rev_range(
64        &self,
65        range: BytesFullKeyRange,
66        limit: Option<usize>,
67    ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>>;
68
69    fn ingest_batch(
70        &self,
71        kv_pairs: impl Iterator<Item = (BytesFullKey, Option<Bytes>)>,
72    ) -> StorageResult<()>;
73
74    fn flush(&self) -> StorageResult<()>;
75}
76
77pub type BTreeMapRangeKv = Arc<RwLock<BTreeMap<BytesFullKey, Option<Bytes>>>>;
78
79impl RangeKv for BTreeMapRangeKv {
80    fn range(
81        &self,
82        range: BytesFullKeyRange,
83        limit: Option<usize>,
84    ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>> {
85        let limit = limit.unwrap_or(usize::MAX);
86        Ok(self
87            .read()
88            .range(range)
89            .take(limit)
90            .map(|(key, value)| (key.clone(), value.clone()))
91            .collect())
92    }
93
94    fn rev_range(
95        &self,
96        range: BytesFullKeyRange,
97        limit: Option<usize>,
98    ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>> {
99        let limit = limit.unwrap_or(usize::MAX);
100        Ok(self
101            .read()
102            .range(range)
103            .rev()
104            .take(limit)
105            .map(|(key, value)| (key.clone(), value.clone()))
106            .collect())
107    }
108
109    fn ingest_batch(
110        &self,
111        kv_pairs: impl Iterator<Item = (BytesFullKey, Option<Bytes>)>,
112    ) -> StorageResult<()> {
113        let mut inner = self.write();
114        for (key, value) in kv_pairs {
115            inner.insert(key, value);
116        }
117        Ok(())
118    }
119
120    fn flush(&self) -> StorageResult<()> {
121        Ok(())
122    }
123}
124
125pub mod sled {
126    use std::fs::create_dir_all;
127    use std::ops::RangeBounds;
128
129    use bytes::Bytes;
130    use risingwave_hummock_sdk::key::FullKey;
131
132    use crate::error::StorageResult;
133    use crate::memory::{BytesFullKey, BytesFullKeyRange, RangeKv, RangeKvStateStore};
134
135    #[derive(Clone)]
136    pub struct SledRangeKv {
137        inner: sled::Db,
138    }
139
140    impl SledRangeKv {
141        pub fn new(path: impl AsRef<std::path::Path>) -> Self {
142            SledRangeKv {
143                inner: sled::open(path).expect("open"),
144            }
145        }
146
147        pub fn new_temp() -> Self {
148            create_dir_all("./.risingwave/sled").expect("should create");
149            let path = tempfile::TempDir::new_in("./.risingwave/sled")
150                .expect("find temp dir")
151                .keep();
152            Self::new(path)
153        }
154    }
155
156    const EMPTY: u8 = 1;
157    const NON_EMPTY: u8 = 0;
158
159    impl RangeKv for SledRangeKv {
160        fn range(
161            &self,
162            range: BytesFullKeyRange,
163            limit: Option<usize>,
164        ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>> {
165            let (left, right) = range;
166            let full_key_ref_bound = (
167                left.as_ref().map(FullKey::to_ref),
168                right.as_ref().map(FullKey::to_ref),
169            );
170            let left_encoded = left.as_ref().map(|key| key.to_ref().encode_reverse_epoch());
171            let right_encoded = right
172                .as_ref()
173                .map(|key| key.to_ref().encode_reverse_epoch());
174            let limit = limit.unwrap_or(usize::MAX);
175            let mut ret = vec![];
176            for result in self.inner.range((left_encoded, right_encoded)).take(limit) {
177                let (key, value) = result?;
178                let full_key = FullKey::decode_reverse_epoch(key.as_ref()).copy_into();
179                if !full_key_ref_bound.contains(&full_key.to_ref()) {
180                    continue;
181                }
182                let value = match value.as_ref() {
183                    [EMPTY] => None,
184                    [NON_EMPTY, rest @ ..] => Some(Bytes::from(Vec::from(rest))),
185                    _ => unreachable!("malformed value: {:?}", value),
186                };
187                ret.push((full_key, value))
188            }
189            Ok(ret)
190        }
191
192        fn rev_range(
193            &self,
194            range: BytesFullKeyRange,
195            limit: Option<usize>,
196        ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>> {
197            let (left, right) = range;
198            let full_key_ref_bound = (
199                left.as_ref().map(FullKey::to_ref),
200                right.as_ref().map(FullKey::to_ref),
201            );
202            let left_encoded = left.as_ref().map(|key| key.to_ref().encode_reverse_epoch());
203            let right_encoded = right
204                .as_ref()
205                .map(|key| key.to_ref().encode_reverse_epoch());
206            let limit = limit.unwrap_or(usize::MAX);
207            let mut ret = vec![];
208            for result in self
209                .inner
210                .range((left_encoded, right_encoded))
211                .rev()
212                .take(limit)
213            {
214                let (key, value) = result?;
215                let full_key = FullKey::decode_reverse_epoch(key.as_ref()).copy_into();
216                if !full_key_ref_bound.contains(&full_key.to_ref()) {
217                    continue;
218                }
219                let value = match value.as_ref() {
220                    [EMPTY] => None,
221                    [NON_EMPTY, rest @ ..] => Some(Bytes::from(Vec::from(rest))),
222                    _ => unreachable!("malformed value: {:?}", value),
223                };
224                ret.push((full_key, value))
225            }
226            Ok(ret)
227        }
228
229        fn ingest_batch(
230            &self,
231            kv_pairs: impl Iterator<Item = (BytesFullKey, Option<Bytes>)>,
232        ) -> StorageResult<()> {
233            let mut batch = sled::Batch::default();
234            for (key, value) in kv_pairs {
235                let encoded_key = key.encode_reverse_epoch();
236                let key = sled::IVec::from(encoded_key);
237                let mut buffer =
238                    Vec::with_capacity(value.as_ref().map(|v| v.len()).unwrap_or_default() + 1);
239                if let Some(value) = value {
240                    buffer.push(NON_EMPTY);
241                    buffer.extend_from_slice(value.as_ref());
242                } else {
243                    buffer.push(EMPTY);
244                }
245                let value = sled::IVec::from(buffer);
246                batch.insert(key, value);
247            }
248            self.inner.apply_batch(batch)?;
249            Ok(())
250        }
251
252        fn flush(&self) -> StorageResult<()> {
253            Ok(self.inner.flush().map(|_| {})?)
254        }
255    }
256
257    pub type SledStateStore = RangeKvStateStore<SledRangeKv>;
258
259    impl SledStateStore {
260        pub fn new(path: impl AsRef<std::path::Path>) -> Self {
261            RangeKvStateStore {
262                inner: SledRangeKv::new(path),
263                tables: Default::default(),
264                vectors: Default::default(),
265            }
266        }
267
268        pub fn new_temp() -> Self {
269            RangeKvStateStore {
270                inner: SledRangeKv::new_temp(),
271                tables: Default::default(),
272                vectors: Default::default(),
273            }
274        }
275    }
276
277    #[cfg(test)]
278    mod test {
279        use std::ops::{Bound, RangeBounds};
280
281        use bytes::Bytes;
282        use risingwave_common::catalog::TableId;
283        use risingwave_common::util::epoch::EPOCH_SPILL_TIME_MASK;
284        use risingwave_hummock_sdk::EpochWithGap;
285        use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey};
286
287        use crate::memory::RangeKv;
288        use crate::memory::sled::SledRangeKv;
289
290        #[test]
291        fn test_filter_variable_key_length_false_positive() {
292            let table_id = TableId { table_id: 233 };
293            let epoch = u64::MAX - u64::from_be_bytes([1, 2, 3, 4, 5, 6, 7, 8]);
294            let excluded_short_table_key = [0, 1, 0, 0];
295            let included_long_table_key = [0, 1, 0, 0, 1, 2];
296            let left_table_key = [0, 1, 0, 0, 1];
297            let right_table_key = [0, 1, 1, 1];
298
299            let to_full_key = |table_key: &[u8]| FullKey {
300                user_key: UserKey {
301                    table_id,
302                    table_key: TableKey(Bytes::from(table_key.to_vec())),
303                },
304                epoch_with_gap: EpochWithGap::new_from_epoch(epoch & !EPOCH_SPILL_TIME_MASK),
305            };
306
307            let left_full_key = to_full_key(&left_table_key[..]);
308            let right_full_key = to_full_key(&right_table_key[..]);
309            let included_long_full_key = to_full_key(&included_long_table_key[..]);
310            let excluded_short_full_key = to_full_key(&excluded_short_table_key[..]);
311
312            assert!(
313                (
314                    Bound::Included(left_full_key.to_ref()),
315                    Bound::Included(right_full_key.to_ref())
316                )
317                    .contains(&included_long_full_key.to_ref())
318            );
319            assert!(
320                !(
321                    Bound::Included(left_full_key.to_ref()),
322                    Bound::Included(right_full_key.to_ref())
323                )
324                    .contains(&excluded_short_full_key.to_ref())
325            );
326
327            let left_encoded = left_full_key.encode_reverse_epoch();
328            let right_encoded = right_full_key.encode_reverse_epoch();
329
330            assert!(
331                (
332                    Bound::Included(left_encoded.clone()),
333                    Bound::Included(right_encoded.clone())
334                )
335                    .contains(&included_long_full_key.encode_reverse_epoch())
336            );
337            assert!(
338                (
339                    Bound::Included(left_encoded),
340                    Bound::Included(right_encoded)
341                )
342                    .contains(&excluded_short_full_key.encode_reverse_epoch())
343            );
344
345            let sled_range_kv = SledRangeKv::new_temp();
346            sled_range_kv
347                .ingest_batch(
348                    vec![
349                        (included_long_full_key.clone(), None),
350                        (excluded_short_full_key, None),
351                    ]
352                    .into_iter(),
353                )
354                .unwrap();
355            let kvs = sled_range_kv
356                .range(
357                    (
358                        Bound::Included(left_full_key),
359                        Bound::Included(right_full_key),
360                    ),
361                    None,
362                )
363                .unwrap();
364            assert_eq!(1, kvs.len());
365            assert_eq!(included_long_full_key.to_ref(), kvs[0].0.to_ref());
366            assert!(kvs[0].1.is_none());
367        }
368    }
369}
370
371mod batched_iter {
372
373    use super::*;
374
375    /// A utility struct for iterating over a range of keys in a locked `BTreeMap`, which will batch
376    /// some records to make a trade-off between the copying overhead and the times of acquiring
377    /// the lock.
378    ///
379    /// Therefore, it's not guaranteed that we're iterating over a consistent snapshot of the map.
380    /// Users should handle MVCC by themselves.
381    pub struct Iter<R: RangeKv> {
382        inner: R,
383        range: BytesFullKeyRange,
384        current: std::vec::IntoIter<(FullKey<Bytes>, Option<Bytes>)>,
385        rev: bool,
386    }
387
388    impl<R: RangeKv> Iter<R> {
389        pub fn new(inner: R, range: BytesFullKeyRange, rev: bool) -> Self {
390            Self {
391                inner,
392                range,
393                rev,
394                current: Vec::new().into_iter(),
395            }
396        }
397    }
398
399    impl<R: RangeKv> Iter<R> {
400        const BATCH_SIZE: usize = 256;
401
402        /// Get the next batch of records and fill the `current` buffer.
403        fn refill(&mut self) -> StorageResult<()> {
404            assert!(self.current.is_empty());
405
406            let batch = if self.rev {
407                self.inner.rev_range(
408                    (self.range.0.clone(), self.range.1.clone()),
409                    Some(Self::BATCH_SIZE),
410                )?
411            } else {
412                self.inner.range(
413                    (self.range.0.clone(), self.range.1.clone()),
414                    Some(Self::BATCH_SIZE),
415                )?
416            };
417
418            if let Some((last_key, _)) = batch.last() {
419                let full_key = FullKey::new_with_gap_epoch(
420                    last_key.user_key.table_id,
421                    TableKey(last_key.user_key.table_key.0.clone()),
422                    last_key.epoch_with_gap,
423                );
424                if self.rev {
425                    self.range.1 = Bound::Excluded(full_key);
426                } else {
427                    self.range.0 = Bound::Excluded(full_key);
428                }
429            }
430            self.current = batch.into_iter();
431            Ok(())
432        }
433    }
434
435    impl<R: RangeKv> Iter<R> {
436        #[allow(clippy::type_complexity)]
437        pub fn next(&mut self) -> StorageResult<Option<(BytesFullKey, Option<Bytes>)>> {
438            match self.current.next() {
439                Some((key, value)) => Ok(Some((key, value))),
440                None => {
441                    self.refill()?;
442                    Ok(self.current.next())
443                }
444            }
445        }
446    }
447
448    #[cfg(test)]
449    mod tests {
450        use rand::Rng;
451
452        use super::*;
453        use crate::memory::sled::SledRangeKv;
454
455        #[test]
456        fn test_btreemap_iter_chaos() {
457            let map = Arc::new(RwLock::new(BTreeMap::new()));
458            test_iter_chaos_inner(map, 1000);
459        }
460
461        #[cfg(not(madsim))]
462        #[test]
463        fn test_sled_iter_chaos() {
464            let map = SledRangeKv::new_temp();
465            test_iter_chaos_inner(map, 100);
466        }
467
468        fn test_iter_chaos_inner(map: impl RangeKv, count: usize) {
469            let key_range = 1..=10000;
470            let num_to_bytes = |k: i32| Bytes::from(format!("{:06}", k).as_bytes().to_vec());
471            let num_to_full_key =
472                |k: i32| FullKey::new(TableId::default(), TableKey(num_to_bytes(k)), 0);
473            #[allow(clippy::mutable_key_type)]
474            map.ingest_batch(key_range.clone().map(|k| {
475                let key = num_to_full_key(k);
476                let b = key.user_key.table_key.0.clone();
477
478                (key, Some(b))
479            }))
480            .unwrap();
481
482            let rand_bound = || {
483                let key = rand::rng().random_range(key_range.clone());
484                let key = num_to_full_key(key);
485                match rand::rng().random_range(1..=5) {
486                    1 | 2 => Bound::Included(key),
487                    3 | 4 => Bound::Excluded(key),
488                    _ => Bound::Unbounded,
489                }
490            };
491
492            for _ in 0..count {
493                let range = loop {
494                    let range = (rand_bound(), rand_bound());
495                    let (start, end) = (range.start_bound(), range.end_bound());
496
497                    // Filter out invalid ranges. Code migrated from `BTreeMap::range`.
498                    match (start, end) {
499                        (Bound::Excluded(s), Bound::Excluded(e)) if s == e => {
500                            continue;
501                        }
502                        (
503                            Bound::Included(s) | Bound::Excluded(s),
504                            Bound::Included(e) | Bound::Excluded(e),
505                        ) if s > e => {
506                            continue;
507                        }
508                        _ => break range,
509                    }
510                };
511
512                let v1 = {
513                    let mut v = vec![];
514                    let mut iter = Iter::new(map.clone(), range.clone(), false);
515                    while let Some((key, value)) = iter.next().unwrap() {
516                        v.push((key, value));
517                    }
518                    v
519                };
520                let v2 = map.range(range, None).unwrap();
521
522                // Items iterated from the batched iterator should be the same as normaliterator.
523                assert_eq!(v1, v2);
524            }
525        }
526    }
527}
528
529pub type MemoryStateStore = RangeKvStateStore<BTreeMapRangeKv>;
530
531struct TableState {
532    init_epoch: u64,
533    next_epochs: BTreeMap<u64, u64>,
534    latest_sealed_epoch: Option<u64>,
535    sealing_epochs: BTreeMap<u64, BitmapBuilder>,
536}
537
538impl TableState {
539    fn new(init_epoch: u64) -> Self {
540        Self {
541            init_epoch,
542            next_epochs: Default::default(),
543            latest_sealed_epoch: None,
544            sealing_epochs: Default::default(),
545        }
546    }
547
548    async fn wait_epoch(
549        tables: &parking_lot::Mutex<HashMap<TableId, Self>>,
550        table_id: TableId,
551        epoch: u64,
552    ) {
553        loop {
554            {
555                let tables = tables.lock();
556                let table_state = tables.get(&table_id).expect("should exist");
557                assert!(epoch >= table_state.init_epoch);
558                if epoch == table_state.init_epoch {
559                    return;
560                }
561                if let Some(latest_sealed_epoch) = table_state.latest_sealed_epoch
562                    && latest_sealed_epoch >= epoch
563                {
564                    return;
565                }
566            }
567            yield_now().await;
568        }
569    }
570}
571
572type InMemVectorStore = Arc<RwLock<HashMap<TableId, Vec<(Vector, Bytes, u64)>>>>;
573
574/// An in-memory state store
575///
576/// The in-memory state store is a [`BTreeMap`], which maps [`FullKey`] to value. It
577/// never does GC, so the memory usage will be high. Therefore, in-memory state store should never
578/// be used in production.
579#[derive(Clone, Default)]
580pub struct RangeKvStateStore<R: RangeKv> {
581    /// Stores (key, epoch) -> user value.
582    inner: R,
583    /// `table_id` -> `prev_epoch` -> `curr_epoch`
584    tables: Arc<parking_lot::Mutex<HashMap<TableId, TableState>>>,
585
586    vectors: InMemVectorStore,
587}
588
589fn to_full_key_range<R, B>(table_id: TableId, table_key_range: R) -> BytesFullKeyRange
590where
591    R: RangeBounds<B> + Send,
592    B: AsRef<[u8]>,
593{
594    let start = match table_key_range.start_bound() {
595        Included(k) => Included(FullKey::new(
596            table_id,
597            TableKey(Bytes::from(k.as_ref().to_vec())),
598            HummockEpoch::MAX,
599        )),
600        Excluded(k) => Excluded(FullKey::new(
601            table_id,
602            TableKey(Bytes::from(k.as_ref().to_vec())),
603            0,
604        )),
605        Unbounded => Included(FullKey::new(
606            table_id,
607            TableKey(Bytes::from(b"".to_vec())),
608            HummockEpoch::MAX,
609        )),
610    };
611    let end = match table_key_range.end_bound() {
612        Included(k) => Included(FullKey::new(
613            table_id,
614            TableKey(Bytes::from(k.as_ref().to_vec())),
615            0,
616        )),
617        Excluded(k) => Excluded(FullKey::new(
618            table_id,
619            TableKey(Bytes::from(k.as_ref().to_vec())),
620            HummockEpoch::MAX,
621        )),
622        Unbounded => {
623            if let Some(next_table_id) = table_id.table_id().checked_add(1) {
624                Excluded(FullKey::new(
625                    next_table_id.into(),
626                    TableKey(Bytes::from(b"".to_vec())),
627                    HummockEpoch::MAX,
628                ))
629            } else {
630                Unbounded
631            }
632        }
633    };
634    (start, end)
635}
636
637impl MemoryStateStore {
638    pub fn new() -> Self {
639        Self::default()
640    }
641
642    pub fn shared() -> Self {
643        static STORE: LazyLock<MemoryStateStore> = LazyLock::new(MemoryStateStore::new);
644        STORE.clone()
645    }
646}
647
648impl<R: RangeKv> RangeKvStateStore<R> {
649    fn scan(
650        &self,
651        key_range: TableKeyRange,
652        epoch: u64,
653        table_id: TableId,
654        limit: Option<usize>,
655    ) -> StorageResult<Vec<(Bytes, Bytes)>> {
656        let mut data = vec![];
657        if limit == Some(0) {
658            return Ok(vec![]);
659        }
660        let mut last_user_key = None;
661        for (key, value) in self
662            .inner
663            .range(to_full_key_range(table_id, key_range), None)?
664        {
665            if key.epoch_with_gap.pure_epoch() > epoch {
666                continue;
667            }
668            if Some(&key.user_key) != last_user_key.as_ref() {
669                if let Some(value) = value {
670                    data.push((Bytes::from(key.encode()), value.clone()));
671                }
672                last_user_key = Some(key.user_key.clone());
673            }
674            if let Some(limit) = limit
675                && data.len() >= limit
676            {
677                break;
678            }
679        }
680        Ok(data)
681    }
682}
683
684#[derive(Clone)]
685pub struct RangeKvStateStoreReadSnapshot<R: RangeKv> {
686    inner: RangeKvStateStore<R>,
687    epoch: u64,
688    table_id: TableId,
689}
690
691impl<R: RangeKv> StateStoreGet for RangeKvStateStoreReadSnapshot<R> {
692    async fn on_key_value<'a, O: Send + 'a>(
693        &'a self,
694        key: TableKey<Bytes>,
695        _read_options: ReadOptions,
696        on_key_value_fn: impl KeyValueFn<'a, O>,
697    ) -> StorageResult<Option<O>> {
698        self.inner
699            .get_keyed_row_impl(key, self.epoch, self.table_id)
700            .and_then(|option| {
701                if let Some((key, value)) = option {
702                    on_key_value_fn(key.to_ref(), value.as_ref()).map(Some)
703                } else {
704                    Ok(None)
705                }
706            })
707    }
708}
709
710impl<R: RangeKv> StateStoreRead for RangeKvStateStoreReadSnapshot<R> {
711    type Iter = RangeKvStateStoreIter<R>;
712    type RevIter = RangeKvStateStoreRevIter<R>;
713
714    async fn iter(
715        &self,
716        key_range: TableKeyRange,
717        _read_options: ReadOptions,
718    ) -> StorageResult<Self::Iter> {
719        self.inner.iter_impl(key_range, self.epoch, self.table_id)
720    }
721
722    async fn rev_iter(
723        &self,
724        key_range: TableKeyRange,
725        _read_options: ReadOptions,
726    ) -> StorageResult<Self::RevIter> {
727        self.inner
728            .rev_iter_impl(key_range, self.epoch, self.table_id)
729    }
730}
731
732impl<R: RangeKv> StateStoreReadVector for RangeKvStateStoreReadSnapshot<R> {
733    async fn nearest<'a, O: Send + 'a>(
734        &'a self,
735        vec: VectorRef<'a>,
736        options: VectorNearestOptions,
737        on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
738    ) -> StorageResult<Vec<O>> {
739        fn nearest_impl<'a, M: MeasureDistanceBuilder, O>(
740            store: &'a InMemVectorStore,
741            epoch: u64,
742            table_id: TableId,
743            vec: VectorRef<'a>,
744            options: VectorNearestOptions,
745            on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
746        ) -> Vec<O> {
747            let mut builder = NearestBuilder::<'_, O, M>::new(vec, options.top_n);
748            builder.add(
749                store
750                    .read()
751                    .get(&table_id)
752                    .map(|vec| vec.iter())
753                    .into_iter()
754                    .flatten()
755                    .filter(|(_, _, vector_epoch)| epoch >= *vector_epoch)
756                    .map(|(vec, info, _)| (vec.to_ref(), info.as_ref())),
757                on_nearest_item_fn,
758            );
759            builder.finish()
760        }
761        dispatch_distance_measurement!(options.measure, MeasurementType, {
762            Ok(nearest_impl::<MeasurementType, O>(
763                &self.inner.vectors,
764                self.epoch,
765                self.table_id,
766                vec,
767                options,
768                on_nearest_item_fn,
769            ))
770        })
771    }
772}
773
774impl<R: RangeKv> RangeKvStateStore<R> {
775    fn get_keyed_row_impl(
776        &self,
777        key: TableKey<Bytes>,
778        epoch: u64,
779        table_id: TableId,
780    ) -> StorageResult<Option<StateStoreKeyedRow>> {
781        let range_bounds = (Bound::Included(key.clone()), Bound::Included(key));
782        // We do not really care about vnodes here, so we just use the default value.
783        let res = self.scan(range_bounds, epoch, table_id, Some(1))?;
784
785        Ok(match res.as_slice() {
786            [] => None,
787            [(key, value)] => Some((
788                FullKey::decode(key.as_ref()).to_vec().into_bytes(),
789                value.clone(),
790            )),
791            _ => unreachable!(),
792        })
793    }
794
795    fn iter_impl(
796        &self,
797        key_range: TableKeyRange,
798        epoch: u64,
799        table_id: TableId,
800    ) -> StorageResult<RangeKvStateStoreIter<R>> {
801        Ok(RangeKvStateStoreIter::new(
802            batched_iter::Iter::new(
803                self.inner.clone(),
804                to_full_key_range(table_id, key_range),
805                false,
806            ),
807            epoch,
808            true,
809        ))
810    }
811
812    fn rev_iter_impl(
813        &self,
814        key_range: TableKeyRange,
815        epoch: u64,
816        table_id: TableId,
817    ) -> StorageResult<RangeKvStateStoreRevIter<R>> {
818        Ok(RangeKvStateStoreRevIter::new(
819            batched_iter::Iter::new(
820                self.inner.clone(),
821                to_full_key_range(table_id, key_range),
822                true,
823            ),
824            epoch,
825            true,
826        ))
827    }
828}
829
830impl<R: RangeKv> StateStoreReadLog for RangeKvStateStore<R> {
831    type ChangeLogIter = RangeKvStateStoreChangeLogIter<R>;
832
833    async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
834        loop {
835            {
836                let tables = self.tables.lock();
837                let Some(tables) = tables.get(&options.table_id) else {
838                    return Err(HummockError::next_epoch(format!(
839                        "table {} not exist",
840                        options.table_id
841                    ))
842                    .into());
843                };
844                if let Some(next_epoch) = tables.next_epochs.get(&epoch) {
845                    break Ok(*next_epoch);
846                }
847            }
848            yield_now().await;
849        }
850    }
851
852    async fn iter_log(
853        &self,
854        (min_epoch, max_epoch): (u64, u64),
855        key_range: TableKeyRange,
856        options: ReadLogOptions,
857    ) -> StorageResult<Self::ChangeLogIter> {
858        let new_value_iter = RangeKvStateStoreIter::new(
859            batched_iter::Iter::new(
860                self.inner.clone(),
861                to_full_key_range(options.table_id, key_range.clone()),
862                false,
863            ),
864            max_epoch,
865            true,
866        );
867        let old_value_iter = RangeKvStateStoreIter::new(
868            batched_iter::Iter::new(
869                self.inner.clone(),
870                to_full_key_range(options.table_id, key_range),
871                false,
872            ),
873            min_epoch,
874            false,
875        );
876        RangeKvStateStoreChangeLogIter::new(new_value_iter, old_value_iter)
877    }
878}
879
880impl<R: RangeKv> RangeKvStateStore<R> {
881    fn new_read_snapshot_impl(
882        &self,
883        epoch: u64,
884        table_id: TableId,
885    ) -> RangeKvStateStoreReadSnapshot<R> {
886        RangeKvStateStoreReadSnapshot {
887            inner: self.clone(),
888            epoch,
889            table_id,
890        }
891    }
892
893    pub(crate) fn ingest_batch(
894        &self,
895        mut kv_pairs: Vec<(TableKey<Bytes>, StorageValue)>,
896        delete_ranges: Vec<(Bound<Bytes>, Bound<Bytes>)>,
897        epoch: u64,
898        table_id: TableId,
899    ) -> StorageResult<usize> {
900        let mut delete_keys = BTreeSet::new();
901        for del_range in delete_ranges {
902            for (key, _) in self.inner.range(
903                (
904                    del_range
905                        .0
906                        .map(|table_key| FullKey::new(table_id, TableKey(table_key), epoch)),
907                    del_range
908                        .1
909                        .map(|table_key| FullKey::new(table_id, TableKey(table_key), epoch)),
910                ),
911                None,
912            )? {
913                delete_keys.insert(key.user_key.table_key);
914            }
915        }
916        for key in delete_keys {
917            kv_pairs.push((key, StorageValue::new_delete()));
918        }
919
920        let mut size = 0;
921        self.inner
922            .ingest_batch(kv_pairs.into_iter().map(|(key, value)| {
923                size += key.len() + value.size();
924                (FullKey::new(table_id, key, epoch), value.user_value)
925            }))?;
926        Ok(size)
927    }
928
929    fn ingest_vectors(&self, table_id: TableId, epoch: u64, vecs: Vec<(Vector, Bytes)>) {
930        self.vectors
931            .write()
932            .entry(table_id)
933            .or_default()
934            .extend(vecs.into_iter().map(|(vec, info)| (vec, info, epoch)));
935    }
936}
937
938impl<R: RangeKv> StateStore for RangeKvStateStore<R> {
939    type Local = RangeKvLocalStateStore<R>;
940    type ReadSnapshot = RangeKvStateStoreReadSnapshot<R>;
941    type VectorWriter = RangeKvLocalStateStore<R>;
942
943    async fn try_wait_epoch(
944        &self,
945        _epoch: HummockReadEpoch,
946        _options: TryWaitEpochOptions,
947    ) -> StorageResult<()> {
948        // memory backend doesn't need to wait for epoch, so this is a no-op.
949        Ok(())
950    }
951
952    async fn new_local(&self, option: NewLocalOptions) -> Self::Local {
953        RangeKvLocalStateStore::new(self.clone(), option)
954    }
955
956    async fn new_read_snapshot(
957        &self,
958        epoch: HummockReadEpoch,
959        options: NewReadSnapshotOptions,
960    ) -> StorageResult<Self::ReadSnapshot> {
961        Ok(self.new_read_snapshot_impl(epoch.get_epoch(), options.table_id))
962    }
963
964    async fn new_vector_writer(&self, options: NewVectorWriterOptions) -> Self::VectorWriter {
965        RangeKvLocalStateStore::new(
966            self.clone(),
967            NewLocalOptions {
968                table_id: options.table_id,
969                op_consistency_level: Default::default(),
970                table_option: Default::default(),
971                is_replicated: false,
972                vnodes: Arc::new(Bitmap::from_bool_slice(&[true])),
973                upload_on_flush: true,
974            },
975        )
976    }
977}
978
979pub struct RangeKvLocalStateStore<R: RangeKv> {
980    mem_table: MemTable,
981    vectors: Vec<(Vector, Bytes)>,
982    inner: RangeKvStateStore<R>,
983
984    epoch: Option<EpochPair>,
985
986    table_id: TableId,
987    op_consistency_level: OpConsistencyLevel,
988    table_option: TableOption,
989    vnodes: Arc<Bitmap>,
990}
991
992impl<R: RangeKv> RangeKvLocalStateStore<R> {
993    pub fn new(inner: RangeKvStateStore<R>, option: NewLocalOptions) -> Self {
994        Self {
995            inner,
996            mem_table: MemTable::new(option.table_id, option.op_consistency_level.clone()),
997            epoch: None,
998            table_id: option.table_id,
999            op_consistency_level: option.op_consistency_level,
1000            table_option: option.table_option,
1001            vnodes: option.vnodes,
1002            vectors: vec![],
1003        }
1004    }
1005
1006    fn epoch(&self) -> u64 {
1007        self.epoch.expect("should have set the epoch").curr
1008    }
1009}
1010
1011impl<R: RangeKv> StateStoreGet for RangeKvLocalStateStore<R> {
1012    async fn on_key_value<'a, O: Send + 'a>(
1013        &'a self,
1014        key: TableKey<Bytes>,
1015        _read_options: ReadOptions,
1016        on_key_value_fn: impl KeyValueFn<'a, O>,
1017    ) -> StorageResult<Option<O>> {
1018        if let Some((key, value)) = match self.mem_table.buffer.get(&key) {
1019            None => self
1020                .inner
1021                .get_keyed_row_impl(key, self.epoch(), self.table_id)?,
1022            Some(op) => match op {
1023                KeyOp::Insert(value) | KeyOp::Update((_, value)) => Some((
1024                    FullKey::new(self.table_id, key, self.epoch()),
1025                    value.clone(),
1026                )),
1027                KeyOp::Delete(_) => None,
1028            },
1029        } {
1030            Ok(Some(on_key_value_fn(key.to_ref(), value.as_ref())?))
1031        } else {
1032            Ok(None)
1033        }
1034    }
1035}
1036
1037impl<R: RangeKv> LocalStateStore for RangeKvLocalStateStore<R> {
1038    type FlushedSnapshotReader = RangeKvStateStoreReadSnapshot<R>;
1039
1040    type Iter<'a> = impl StateStoreIter + 'a;
1041    type RevIter<'a> = impl StateStoreIter + 'a;
1042
1043    async fn iter(
1044        &self,
1045        key_range: TableKeyRange,
1046        _read_options: ReadOptions,
1047    ) -> StorageResult<Self::Iter<'_>> {
1048        let iter = self
1049            .inner
1050            .iter_impl(key_range.clone(), self.epoch(), self.table_id)?;
1051        Ok(FromStreamStateStoreIter::new(Box::pin(merge_stream(
1052            self.mem_table.iter(key_range),
1053            iter.into_stream(to_owned_item),
1054            self.table_id,
1055            self.epoch(),
1056            false,
1057        ))))
1058    }
1059
1060    async fn rev_iter(
1061        &self,
1062        key_range: TableKeyRange,
1063        _read_options: ReadOptions,
1064    ) -> StorageResult<Self::RevIter<'_>> {
1065        let iter = self
1066            .inner
1067            .rev_iter_impl(key_range.clone(), self.epoch(), self.table_id)?;
1068        Ok(FromStreamStateStoreIter::new(Box::pin(merge_stream(
1069            self.mem_table.rev_iter(key_range),
1070            iter.into_stream(to_owned_item),
1071            self.table_id,
1072            self.epoch(),
1073            true,
1074        ))))
1075    }
1076
1077    fn insert(
1078        &mut self,
1079        key: TableKey<Bytes>,
1080        new_val: Bytes,
1081        old_val: Option<Bytes>,
1082    ) -> StorageResult<()> {
1083        match old_val {
1084            None => self.mem_table.insert(key, new_val)?,
1085            Some(old_val) => self.mem_table.update(key, old_val, new_val)?,
1086        };
1087        Ok(())
1088    }
1089
1090    fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
1091        Ok(self.mem_table.delete(key, old_val)?)
1092    }
1093
1094    async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
1095        if self.vnodes.len() > 1 {
1096            TableState::wait_epoch(
1097                &self.inner.tables,
1098                self.table_id,
1099                self.epoch.expect("should have init").prev,
1100            )
1101            .await;
1102        }
1103        Ok(std::mem::replace(&mut self.vnodes, vnodes))
1104    }
1105
1106    fn get_table_watermark(&self, _vnode: VirtualNode) -> Option<Bytes> {
1107        // TODO: may store the written table watermark and have a correct implementation
1108        None
1109    }
1110
1111    fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
1112        self.inner.new_read_snapshot_impl(MAX_EPOCH, self.table_id)
1113    }
1114}
1115
1116impl<R: RangeKv> StateStoreWriteEpochControl for RangeKvLocalStateStore<R> {
1117    async fn flush(&mut self) -> StorageResult<usize> {
1118        let buffer = self.mem_table.drain().into_parts();
1119        let mut kv_pairs = Vec::with_capacity(buffer.len());
1120        let sanity_check_read_snapshot = if sanity_check_enabled() {
1121            Some(self.inner.new_read_snapshot_impl(MAX_EPOCH, self.table_id))
1122        } else {
1123            None
1124        };
1125        for (key, key_op) in buffer {
1126            match key_op {
1127                // Currently, some executors do not strictly comply with these semantics. As
1128                // a workaround you may call disable the check by initializing the
1129                // state store with `op_consistency_level=Inconsistent`.
1130                KeyOp::Insert(value) => {
1131                    if let Some(sanity_check_read_snapshot) = &sanity_check_read_snapshot {
1132                        do_insert_sanity_check(
1133                            self.table_id,
1134                            &key,
1135                            &value,
1136                            sanity_check_read_snapshot,
1137                            self.table_option,
1138                            &self.op_consistency_level,
1139                        )
1140                        .await?;
1141                    }
1142                    kv_pairs.push((key, StorageValue::new_put(value)));
1143                }
1144                KeyOp::Delete(old_value) => {
1145                    if let Some(sanity_check_read_snapshot) = &sanity_check_read_snapshot {
1146                        do_delete_sanity_check(
1147                            self.table_id,
1148                            &key,
1149                            &old_value,
1150                            sanity_check_read_snapshot,
1151                            self.table_option,
1152                            &self.op_consistency_level,
1153                        )
1154                        .await?;
1155                    }
1156                    kv_pairs.push((key, StorageValue::new_delete()));
1157                }
1158                KeyOp::Update((old_value, new_value)) => {
1159                    if let Some(sanity_check_read_snapshot) = &sanity_check_read_snapshot {
1160                        do_update_sanity_check(
1161                            self.table_id,
1162                            &key,
1163                            &old_value,
1164                            &new_value,
1165                            sanity_check_read_snapshot,
1166                            self.table_option,
1167                            &self.op_consistency_level,
1168                        )
1169                        .await?;
1170                    }
1171                    kv_pairs.push((key, StorageValue::new_put(new_value)));
1172                }
1173            }
1174        }
1175        let epoch = self.epoch();
1176        self.inner
1177            .ingest_vectors(self.table_id, epoch, take(&mut self.vectors));
1178        self.inner
1179            .ingest_batch(kv_pairs, vec![], epoch, self.table_id)
1180    }
1181
1182    async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
1183        assert_eq!(
1184            self.epoch.replace(options.epoch),
1185            None,
1186            "epoch in local state store of table id {:?} is init for more than once",
1187            self.table_id
1188        );
1189        self.inner
1190            .tables
1191            .lock()
1192            .entry(self.table_id)
1193            .or_insert_with(|| TableState::new(options.epoch.prev))
1194            .next_epochs
1195            .insert(options.epoch.prev, options.epoch.curr);
1196        if self.vnodes.len() > 1 {
1197            TableState::wait_epoch(&self.inner.tables, self.table_id, options.epoch.prev).await;
1198        }
1199
1200        Ok(())
1201    }
1202
1203    fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
1204        assert!(!self.mem_table.is_dirty());
1205        if let Some(value_checker) = opts.switch_op_consistency_level {
1206            self.mem_table.op_consistency_level.update(&value_checker);
1207        }
1208        let epoch = self
1209            .epoch
1210            .as_mut()
1211            .expect("should have init epoch before seal the first epoch");
1212        let prev_epoch = epoch.curr;
1213        epoch.prev = prev_epoch;
1214        epoch.curr = next_epoch;
1215        assert!(
1216            next_epoch > prev_epoch,
1217            "new epoch {} should be greater than current epoch: {}",
1218            next_epoch,
1219            prev_epoch
1220        );
1221
1222        let mut tables = self.inner.tables.lock();
1223        let table_state = tables
1224            .get_mut(&self.table_id)
1225            .expect("should be set when init");
1226
1227        table_state.next_epochs.insert(prev_epoch, next_epoch);
1228        if self.vnodes.len() > 1 {
1229            let sealing_epoch_vnodes = table_state
1230                .sealing_epochs
1231                .entry(prev_epoch)
1232                .or_insert_with(|| BitmapBuilder::zeroed(self.vnodes.len()));
1233            assert_eq!(self.vnodes.len(), sealing_epoch_vnodes.len());
1234            for vnode in self.vnodes.iter_ones() {
1235                assert!(!sealing_epoch_vnodes.is_set(vnode));
1236                sealing_epoch_vnodes.set(vnode, true);
1237            }
1238            if (0..self.vnodes.len()).all(|vnode| sealing_epoch_vnodes.is_set(vnode)) {
1239                let (all_sealed_epoch, _) =
1240                    table_state.sealing_epochs.pop_first().expect("non-empty");
1241                assert_eq!(
1242                    all_sealed_epoch, prev_epoch,
1243                    "new all_sealed_epoch must be the current prev epoch"
1244                );
1245                if let Some(prev_latest_sealed_epoch) =
1246                    table_state.latest_sealed_epoch.replace(prev_epoch)
1247                {
1248                    assert!(prev_epoch > prev_latest_sealed_epoch);
1249                }
1250            }
1251        }
1252
1253        if let Some((direction, watermarks, _watermark_type)) = opts.table_watermarks {
1254            let delete_ranges = watermarks
1255                .iter()
1256                .flat_map(|vnode_watermark| {
1257                    let inner_range = match direction {
1258                        WatermarkDirection::Ascending => {
1259                            (Unbounded, Excluded(vnode_watermark.watermark().clone()))
1260                        }
1261                        WatermarkDirection::Descending => {
1262                            (Excluded(vnode_watermark.watermark().clone()), Unbounded)
1263                        }
1264                    };
1265                    vnode_watermark
1266                        .vnode_bitmap()
1267                        .iter_vnodes()
1268                        .map(move |vnode| {
1269                            let (start, end) =
1270                                prefixed_range_with_vnode(inner_range.clone(), vnode);
1271                            (start.map(|key| key.0), end.map(|key| key.0))
1272                        })
1273                })
1274                .collect_vec();
1275            if let Err(e) =
1276                self.inner
1277                    .ingest_batch(Vec::new(), delete_ranges, self.epoch(), self.table_id)
1278            {
1279                error!(error = %e.as_report(), "failed to write delete ranges of table watermark");
1280            }
1281        }
1282    }
1283
1284    async fn try_flush(&mut self) -> StorageResult<()> {
1285        Ok(())
1286    }
1287}
1288
1289impl<R: RangeKv> StateStoreWriteVector for RangeKvLocalStateStore<R> {
1290    fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> StorageResult<()> {
1291        self.vectors.push((vec.to_owned_scalar(), info));
1292        Ok(())
1293    }
1294}
1295
1296pub struct RangeKvStateStoreIter<R: RangeKv> {
1297    inner: batched_iter::Iter<R>,
1298
1299    epoch: HummockEpoch,
1300    is_inclusive_epoch: bool,
1301
1302    last_key: Option<UserKey<Bytes>>,
1303
1304    item_buffer: Option<StateStoreKeyedRow>,
1305}
1306
1307impl<R: RangeKv> RangeKvStateStoreIter<R> {
1308    pub fn new(
1309        inner: batched_iter::Iter<R>,
1310        epoch: HummockEpoch,
1311        is_inclusive_epoch: bool,
1312    ) -> Self {
1313        Self {
1314            inner,
1315            epoch,
1316            is_inclusive_epoch,
1317            last_key: None,
1318            item_buffer: None,
1319        }
1320    }
1321}
1322
1323impl<R: RangeKv> StateStoreIter for RangeKvStateStoreIter<R> {
1324    async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
1325        self.next_inner()?;
1326        Ok(self
1327            .item_buffer
1328            .as_ref()
1329            .map(|(key, value)| (key.to_ref(), value.as_ref())))
1330    }
1331}
1332
1333impl<R: RangeKv> RangeKvStateStoreIter<R> {
1334    fn next_inner(&mut self) -> StorageResult<()> {
1335        self.item_buffer = None;
1336        while let Some((key, value)) = self.inner.next()? {
1337            let epoch = key.epoch_with_gap.pure_epoch();
1338            if epoch > self.epoch {
1339                continue;
1340            }
1341            if epoch == self.epoch && !self.is_inclusive_epoch {
1342                continue;
1343            }
1344            if Some(key.user_key.as_ref()) != self.last_key.as_ref().map(|key| key.as_ref()) {
1345                self.last_key = Some(key.user_key.clone());
1346                if let Some(value) = value {
1347                    self.item_buffer = Some((key, value));
1348                    break;
1349                }
1350            }
1351        }
1352        Ok(())
1353    }
1354}
1355
1356pub struct RangeKvStateStoreRevIter<R: RangeKv> {
1357    inner: batched_iter::Iter<R>,
1358
1359    epoch: HummockEpoch,
1360    is_inclusive_epoch: bool,
1361
1362    item_buffer: VecDeque<StateStoreKeyedRow>,
1363}
1364
1365impl<R: RangeKv> RangeKvStateStoreRevIter<R> {
1366    pub fn new(
1367        inner: batched_iter::Iter<R>,
1368        epoch: HummockEpoch,
1369        is_inclusive_epoch: bool,
1370    ) -> Self {
1371        Self {
1372            inner,
1373            epoch,
1374            is_inclusive_epoch,
1375            item_buffer: VecDeque::default(),
1376        }
1377    }
1378}
1379
1380impl<R: RangeKv> StateStoreIter for RangeKvStateStoreRevIter<R> {
1381    async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
1382        self.next_inner()?;
1383        Ok(self
1384            .item_buffer
1385            .back()
1386            .map(|(key, value)| (key.to_ref(), value.as_ref())))
1387    }
1388}
1389
1390impl<R: RangeKv> RangeKvStateStoreRevIter<R> {
1391    fn next_inner(&mut self) -> StorageResult<()> {
1392        self.item_buffer.pop_back();
1393        while let Some((key, value)) = self.inner.next()? {
1394            let epoch = key.epoch_with_gap.pure_epoch();
1395            if epoch > self.epoch {
1396                continue;
1397            }
1398            if epoch == self.epoch && !self.is_inclusive_epoch {
1399                continue;
1400            }
1401
1402            let v = match value {
1403                Some(v) => v,
1404                None => {
1405                    if let Some(last_key) = self.item_buffer.front()
1406                        && key.user_key.as_ref() == last_key.0.user_key.as_ref()
1407                    {
1408                        self.item_buffer.clear();
1409                    }
1410                    continue;
1411                }
1412            };
1413
1414            if let Some(last_key) = self.item_buffer.front() {
1415                if key.user_key.as_ref() != last_key.0.user_key.as_ref() {
1416                    self.item_buffer.push_front((key, v));
1417                    break;
1418                } else {
1419                    self.item_buffer.pop_front();
1420                    self.item_buffer.push_front((key, v));
1421                }
1422            } else {
1423                self.item_buffer.push_front((key, v));
1424            }
1425        }
1426        Ok(())
1427    }
1428}
1429
1430pub struct RangeKvStateStoreChangeLogIter<R: RangeKv> {
1431    new_value_iter: RangeKvStateStoreIter<R>,
1432    old_value_iter: RangeKvStateStoreIter<R>,
1433    item_buffer: Option<(TableKey<Bytes>, ChangeLogValue<Bytes>)>,
1434}
1435
1436impl<R: RangeKv> RangeKvStateStoreChangeLogIter<R> {
1437    fn new(
1438        mut new_value_iter: RangeKvStateStoreIter<R>,
1439        mut old_value_iter: RangeKvStateStoreIter<R>,
1440    ) -> StorageResult<Self> {
1441        new_value_iter.next_inner()?;
1442        old_value_iter.next_inner()?;
1443        Ok(Self {
1444            new_value_iter,
1445            old_value_iter,
1446            item_buffer: None,
1447        })
1448    }
1449}
1450
1451impl<R: RangeKv> StateStoreIter<StateStoreReadLogItem> for RangeKvStateStoreChangeLogIter<R> {
1452    async fn try_next(&mut self) -> StorageResult<Option<StateStoreReadLogItemRef<'_>>> {
1453        loop {
1454            match (
1455                &self.new_value_iter.item_buffer,
1456                &self.old_value_iter.item_buffer,
1457            ) {
1458                (None, None) => {
1459                    self.item_buffer = None;
1460                    break;
1461                }
1462                (Some((key, new_value)), None) => {
1463                    self.item_buffer = Some((
1464                        key.user_key.table_key.clone(),
1465                        ChangeLogValue::Insert(new_value.clone()),
1466                    ));
1467                    self.new_value_iter.next_inner()?;
1468                }
1469                (None, Some((key, old_value))) => {
1470                    self.item_buffer = Some((
1471                        key.user_key.table_key.clone(),
1472                        ChangeLogValue::Delete(old_value.clone()),
1473                    ));
1474                    self.old_value_iter.next_inner()?;
1475                }
1476                (Some((new_value_key, new_value)), Some((old_value_key, old_value))) => {
1477                    match new_value_key.user_key.cmp(&old_value_key.user_key) {
1478                        Ordering::Less => {
1479                            self.item_buffer = Some((
1480                                new_value_key.user_key.table_key.clone(),
1481                                ChangeLogValue::Insert(new_value.clone()),
1482                            ));
1483                            self.new_value_iter.next_inner()?;
1484                        }
1485                        Ordering::Greater => {
1486                            self.item_buffer = Some((
1487                                old_value_key.user_key.table_key.clone(),
1488                                ChangeLogValue::Delete(old_value.clone()),
1489                            ));
1490                            self.old_value_iter.next_inner()?;
1491                        }
1492                        Ordering::Equal => {
1493                            if new_value == old_value {
1494                                self.new_value_iter.next_inner()?;
1495                                self.old_value_iter.next_inner()?;
1496                                continue;
1497                            }
1498                            self.item_buffer = Some((
1499                                new_value_key.user_key.table_key.clone(),
1500                                ChangeLogValue::Update {
1501                                    new_value: new_value.clone(),
1502                                    old_value: old_value.clone(),
1503                                },
1504                            ));
1505                            self.new_value_iter.next_inner()?;
1506                            self.old_value_iter.next_inner()?;
1507                        }
1508                    }
1509                }
1510            };
1511            break;
1512        }
1513        Ok(self
1514            .item_buffer
1515            .as_ref()
1516            .map(|(key, value)| (key.to_ref(), value.to_ref())))
1517    }
1518}
1519
1520#[cfg(test)]
1521mod tests {
1522    use risingwave_common::util::epoch::test_epoch;
1523
1524    use super::*;
1525    use crate::hummock::iterator::test_utils::{
1526        iterator_test_table_key_of, iterator_test_value_of,
1527    };
1528    use crate::hummock::test_utils::{ReadOptions, *};
1529    use crate::memory::sled::SledStateStore;
1530
1531    #[tokio::test]
1532    async fn test_snapshot_isolation_memory() {
1533        let state_store = MemoryStateStore::new();
1534        test_snapshot_isolation_inner(state_store).await;
1535    }
1536
1537    #[cfg(not(madsim))]
1538    #[tokio::test]
1539    async fn test_snapshot_isolation_sled() {
1540        let state_store = SledStateStore::new_temp();
1541        test_snapshot_isolation_inner(state_store).await;
1542    }
1543
1544    async fn test_snapshot_isolation_inner(state_store: RangeKvStateStore<impl RangeKv>) {
1545        state_store
1546            .ingest_batch(
1547                vec![
1548                    (
1549                        TableKey(Bytes::from(b"a".to_vec())),
1550                        StorageValue::new_put(b"v1".to_vec()),
1551                    ),
1552                    (
1553                        TableKey(Bytes::from(b"b".to_vec())),
1554                        StorageValue::new_put(b"v1".to_vec()),
1555                    ),
1556                ],
1557                vec![],
1558                0,
1559                Default::default(),
1560            )
1561            .unwrap();
1562        state_store
1563            .ingest_batch(
1564                vec![
1565                    (
1566                        TableKey(Bytes::from(b"a".to_vec())),
1567                        StorageValue::new_put(b"v2".to_vec()),
1568                    ),
1569                    (
1570                        TableKey(Bytes::from(b"b".to_vec())),
1571                        StorageValue::new_delete(),
1572                    ),
1573                ],
1574                vec![],
1575                test_epoch(1),
1576                Default::default(),
1577            )
1578            .unwrap();
1579        assert_eq!(
1580            state_store
1581                .scan(
1582                    (
1583                        Bound::Included(TableKey(Bytes::from("a"))),
1584                        Bound::Included(TableKey(Bytes::from("b"))),
1585                    ),
1586                    0,
1587                    TableId::default(),
1588                    None,
1589                )
1590                .unwrap(),
1591            vec![
1592                (
1593                    FullKey::for_test(Default::default(), Bytes::from("a"), 0)
1594                        .encode()
1595                        .into(),
1596                    b"v1".to_vec().into()
1597                ),
1598                (
1599                    FullKey::for_test(Default::default(), Bytes::from("b"), 0)
1600                        .encode()
1601                        .into(),
1602                    b"v1".to_vec().into()
1603                )
1604            ]
1605        );
1606        assert_eq!(
1607            state_store
1608                .scan(
1609                    (
1610                        Bound::Included(TableKey(Bytes::from("a"))),
1611                        Bound::Included(TableKey(Bytes::from("b"))),
1612                    ),
1613                    0,
1614                    TableId::default(),
1615                    Some(1),
1616                )
1617                .unwrap(),
1618            vec![(
1619                FullKey::for_test(Default::default(), b"a".to_vec(), 0)
1620                    .encode()
1621                    .into(),
1622                b"v1".to_vec().into()
1623            )]
1624        );
1625        assert_eq!(
1626            state_store
1627                .scan(
1628                    (
1629                        Bound::Included(TableKey(Bytes::from("a"))),
1630                        Bound::Included(TableKey(Bytes::from("b"))),
1631                    ),
1632                    test_epoch(1),
1633                    TableId::default(),
1634                    None,
1635                )
1636                .unwrap(),
1637            vec![(
1638                FullKey::for_test(Default::default(), b"a".to_vec(), test_epoch(1))
1639                    .encode()
1640                    .into(),
1641                b"v2".to_vec().into()
1642            )]
1643        );
1644        assert_eq!(
1645            state_store
1646                .get(TableKey(Bytes::from("a")), 0, ReadOptions::default())
1647                .await
1648                .unwrap(),
1649            Some(Bytes::from("v1"))
1650        );
1651        assert_eq!(
1652            state_store
1653                .get(
1654                    TableKey(Bytes::copy_from_slice(b"b")),
1655                    0,
1656                    ReadOptions::default(),
1657                )
1658                .await
1659                .unwrap(),
1660            Some(b"v1".to_vec().into())
1661        );
1662        assert_eq!(
1663            state_store
1664                .get(
1665                    TableKey(Bytes::copy_from_slice(b"c")),
1666                    0,
1667                    ReadOptions::default(),
1668                )
1669                .await
1670                .unwrap(),
1671            None
1672        );
1673        assert_eq!(
1674            state_store
1675                .get(
1676                    TableKey(Bytes::copy_from_slice(b"a")),
1677                    test_epoch(1),
1678                    ReadOptions::default(),
1679                )
1680                .await
1681                .unwrap(),
1682            Some(b"v2".to_vec().into())
1683        );
1684        assert_eq!(
1685            state_store
1686                .get(
1687                    TableKey(Bytes::from("b")),
1688                    test_epoch(1),
1689                    ReadOptions::default(),
1690                )
1691                .await
1692                .unwrap(),
1693            None
1694        );
1695        assert_eq!(
1696            state_store
1697                .get(
1698                    TableKey(Bytes::from("c")),
1699                    test_epoch(1),
1700                    ReadOptions::default()
1701                )
1702                .await
1703                .unwrap(),
1704            None
1705        );
1706    }
1707
1708    #[tokio::test]
1709    async fn test_iter_log_memory() {
1710        let state_store = MemoryStateStore::new();
1711        test_iter_log_inner(state_store).await;
1712    }
1713
1714    #[cfg(not(madsim))]
1715    #[tokio::test]
1716    async fn test_iter_log_sled() {
1717        let state_store = SledStateStore::new_temp();
1718        test_iter_log_inner(state_store).await;
1719    }
1720
1721    async fn test_iter_log_inner(state_store: RangeKvStateStore<impl RangeKv>) {
1722        let table_id = TableId::new(233);
1723        let epoch1 = test_epoch(1);
1724        let key_idx = [1, 2, 4];
1725        let make_key = |i| TableKey(Bytes::from(iterator_test_table_key_of(i)));
1726        let make_value = |i| Bytes::from(iterator_test_value_of(i));
1727        state_store
1728            .ingest_batch(
1729                key_idx
1730                    .iter()
1731                    .map(|i| (make_key(*i), StorageValue::new_put(make_value(*i))))
1732                    .collect(),
1733                vec![],
1734                epoch1,
1735                table_id,
1736            )
1737            .unwrap();
1738        {
1739            let mut iter = state_store
1740                .iter_log(
1741                    (epoch1, epoch1),
1742                    (Unbounded, Unbounded),
1743                    ReadLogOptions { table_id },
1744                )
1745                .await
1746                .unwrap();
1747            for i in key_idx {
1748                let (iter_key, change_value) = iter.try_next().await.unwrap().unwrap();
1749                assert_eq!(make_key(i).to_ref(), iter_key);
1750                assert_eq!(change_value, ChangeLogValue::Insert(make_value(i).as_ref()));
1751            }
1752            assert!(iter.try_next().await.unwrap().is_none());
1753        }
1754
1755        let epoch2 = test_epoch(2);
1756        state_store
1757            .ingest_batch(
1758                vec![
1759                    (make_key(1), StorageValue::new_put(make_value(12))), // update
1760                    (make_key(2), StorageValue::new_delete()),            // delete
1761                    (make_key(3), StorageValue::new_put(make_value(3))),
1762                ],
1763                vec![],
1764                epoch2,
1765                table_id,
1766            )
1767            .unwrap();
1768
1769        // check iter log between two epoch
1770        {
1771            let expected = vec![
1772                (
1773                    make_key(1),
1774                    ChangeLogValue::Update {
1775                        new_value: make_value(12),
1776                        old_value: make_value(1),
1777                    },
1778                ),
1779                (make_key(2), ChangeLogValue::Delete(make_value(2))),
1780                (make_key(3), ChangeLogValue::Insert(make_value(3))),
1781            ];
1782            let mut iter = state_store
1783                .iter_log(
1784                    (epoch2, epoch2),
1785                    (Unbounded, Unbounded),
1786                    ReadLogOptions { table_id },
1787                )
1788                .await
1789                .unwrap();
1790            for (key, change_log_value) in expected {
1791                let (iter_key, iter_value) = iter.try_next().await.unwrap().unwrap();
1792                assert_eq!(
1793                    key.to_ref(),
1794                    iter_key,
1795                    "{:?} {:?}",
1796                    change_log_value.to_ref(),
1797                    iter_value
1798                );
1799                assert_eq!(change_log_value.to_ref(), iter_value);
1800            }
1801            assert!(iter.try_next().await.unwrap().is_none());
1802        }
1803        // check iter log on the original old epoch
1804        {
1805            let mut iter = state_store
1806                .iter_log(
1807                    (epoch1, epoch1),
1808                    (Unbounded, Unbounded),
1809                    ReadLogOptions { table_id },
1810                )
1811                .await
1812                .unwrap();
1813            for i in key_idx {
1814                let (iter_key, change_value) = iter.try_next().await.unwrap().unwrap();
1815                assert_eq!(make_key(i).to_ref(), iter_key);
1816                assert_eq!(change_value, ChangeLogValue::Insert(make_value(i).as_ref()));
1817            }
1818            assert!(iter.try_next().await.unwrap().is_none());
1819        }
1820        // check iter on merging the two epochs
1821        {
1822            let mut iter = state_store
1823                .iter_log(
1824                    (epoch1, epoch2),
1825                    (Unbounded, Unbounded),
1826                    ReadLogOptions { table_id },
1827                )
1828                .await
1829                .unwrap();
1830            let (iter_key, change_value) = iter.try_next().await.unwrap().unwrap();
1831            assert_eq!(make_key(1).to_ref(), iter_key);
1832            assert_eq!(
1833                change_value,
1834                ChangeLogValue::Insert(make_value(12).as_ref())
1835            );
1836            for i in [3, 4] {
1837                let (iter_key, change_value) = iter.try_next().await.unwrap().unwrap();
1838                assert_eq!(make_key(i).to_ref(), iter_key);
1839                assert_eq!(change_value, ChangeLogValue::Insert(make_value(i).as_ref()));
1840            }
1841            assert!(iter.try_next().await.unwrap().is_none());
1842        }
1843    }
1844}