1use std::cmp::Ordering;
16use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
17use std::mem::take;
18use std::ops::Bound::{Excluded, Included, Unbounded};
19use std::ops::{Bound, RangeBounds};
20use std::sync::{Arc, LazyLock};
21
22use bytes::Bytes;
23use itertools::Itertools;
24use parking_lot::RwLock;
25use risingwave_common::array::VectorRef;
26use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
27use risingwave_common::catalog::{TableId, TableOption};
28use risingwave_common::dispatch_distance_measurement;
29use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
30use risingwave_common::types::ScalarRef;
31use risingwave_common::util::epoch::{EpochPair, MAX_EPOCH};
32use risingwave_hummock_sdk::key::{
33 FullKey, TableKey, TableKeyRange, UserKey, prefixed_range_with_vnode,
34};
35use risingwave_hummock_sdk::table_watermark::WatermarkDirection;
36use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch};
37use thiserror_ext::AsReport;
38use tokio::task::yield_now;
39use tracing::error;
40
41use crate::error::StorageResult;
42use crate::hummock::HummockError;
43use crate::hummock::utils::{
44 do_delete_sanity_check, do_insert_sanity_check, do_update_sanity_check, merge_stream,
45 sanity_check_enabled,
46};
47use crate::mem_table::{KeyOp, MemTable};
48use crate::storage_value::StorageValue;
49use crate::store::*;
50use crate::vector::{MeasureDistanceBuilder, NearestBuilder};
51
52pub type BytesFullKey = FullKey<Bytes>;
53pub type BytesFullKeyRange = (Bound<BytesFullKey>, Bound<BytesFullKey>);
54
55#[allow(clippy::type_complexity)]
56pub trait RangeKv: Clone + Send + Sync + 'static {
57 fn range(
58 &self,
59 range: BytesFullKeyRange,
60 limit: Option<usize>,
61 ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>>;
62
63 fn rev_range(
64 &self,
65 range: BytesFullKeyRange,
66 limit: Option<usize>,
67 ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>>;
68
69 fn ingest_batch(
70 &self,
71 kv_pairs: impl Iterator<Item = (BytesFullKey, Option<Bytes>)>,
72 ) -> StorageResult<()>;
73
74 fn flush(&self) -> StorageResult<()>;
75}
76
77pub type BTreeMapRangeKv = Arc<RwLock<BTreeMap<BytesFullKey, Option<Bytes>>>>;
78
79impl RangeKv for BTreeMapRangeKv {
80 fn range(
81 &self,
82 range: BytesFullKeyRange,
83 limit: Option<usize>,
84 ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>> {
85 let limit = limit.unwrap_or(usize::MAX);
86 Ok(self
87 .read()
88 .range(range)
89 .take(limit)
90 .map(|(key, value)| (key.clone(), value.clone()))
91 .collect())
92 }
93
94 fn rev_range(
95 &self,
96 range: BytesFullKeyRange,
97 limit: Option<usize>,
98 ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>> {
99 let limit = limit.unwrap_or(usize::MAX);
100 Ok(self
101 .read()
102 .range(range)
103 .rev()
104 .take(limit)
105 .map(|(key, value)| (key.clone(), value.clone()))
106 .collect())
107 }
108
109 fn ingest_batch(
110 &self,
111 kv_pairs: impl Iterator<Item = (BytesFullKey, Option<Bytes>)>,
112 ) -> StorageResult<()> {
113 let mut inner = self.write();
114 for (key, value) in kv_pairs {
115 inner.insert(key, value);
116 }
117 Ok(())
118 }
119
120 fn flush(&self) -> StorageResult<()> {
121 Ok(())
122 }
123}
124
125pub mod sled {
126 use std::fs::create_dir_all;
127 use std::ops::RangeBounds;
128
129 use bytes::Bytes;
130 use risingwave_hummock_sdk::key::FullKey;
131
132 use crate::error::StorageResult;
133 use crate::memory::{BytesFullKey, BytesFullKeyRange, RangeKv, RangeKvStateStore};
134
135 #[derive(Clone)]
136 pub struct SledRangeKv {
137 inner: sled::Db,
138 }
139
140 impl SledRangeKv {
141 pub fn new(path: impl AsRef<std::path::Path>) -> Self {
142 SledRangeKv {
143 inner: sled::open(path).expect("open"),
144 }
145 }
146
147 pub fn new_temp() -> Self {
148 create_dir_all("./.risingwave/sled").expect("should create");
149 let path = tempfile::TempDir::new_in("./.risingwave/sled")
150 .expect("find temp dir")
151 .keep();
152 Self::new(path)
153 }
154 }
155
156 const EMPTY: u8 = 1;
157 const NON_EMPTY: u8 = 0;
158
159 impl RangeKv for SledRangeKv {
160 fn range(
161 &self,
162 range: BytesFullKeyRange,
163 limit: Option<usize>,
164 ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>> {
165 let (left, right) = range;
166 let full_key_ref_bound = (
167 left.as_ref().map(FullKey::to_ref),
168 right.as_ref().map(FullKey::to_ref),
169 );
170 let left_encoded = left.as_ref().map(|key| key.to_ref().encode_reverse_epoch());
171 let right_encoded = right
172 .as_ref()
173 .map(|key| key.to_ref().encode_reverse_epoch());
174 let limit = limit.unwrap_or(usize::MAX);
175 let mut ret = vec![];
176 for result in self.inner.range((left_encoded, right_encoded)).take(limit) {
177 let (key, value) = result?;
178 let full_key = FullKey::decode_reverse_epoch(key.as_ref()).copy_into();
179 if !full_key_ref_bound.contains(&full_key.to_ref()) {
180 continue;
181 }
182 let value = match value.as_ref() {
183 [EMPTY] => None,
184 [NON_EMPTY, rest @ ..] => Some(Bytes::from(Vec::from(rest))),
185 _ => unreachable!("malformed value: {:?}", value),
186 };
187 ret.push((full_key, value))
188 }
189 Ok(ret)
190 }
191
192 fn rev_range(
193 &self,
194 range: BytesFullKeyRange,
195 limit: Option<usize>,
196 ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>> {
197 let (left, right) = range;
198 let full_key_ref_bound = (
199 left.as_ref().map(FullKey::to_ref),
200 right.as_ref().map(FullKey::to_ref),
201 );
202 let left_encoded = left.as_ref().map(|key| key.to_ref().encode_reverse_epoch());
203 let right_encoded = right
204 .as_ref()
205 .map(|key| key.to_ref().encode_reverse_epoch());
206 let limit = limit.unwrap_or(usize::MAX);
207 let mut ret = vec![];
208 for result in self
209 .inner
210 .range((left_encoded, right_encoded))
211 .rev()
212 .take(limit)
213 {
214 let (key, value) = result?;
215 let full_key = FullKey::decode_reverse_epoch(key.as_ref()).copy_into();
216 if !full_key_ref_bound.contains(&full_key.to_ref()) {
217 continue;
218 }
219 let value = match value.as_ref() {
220 [EMPTY] => None,
221 [NON_EMPTY, rest @ ..] => Some(Bytes::from(Vec::from(rest))),
222 _ => unreachable!("malformed value: {:?}", value),
223 };
224 ret.push((full_key, value))
225 }
226 Ok(ret)
227 }
228
229 fn ingest_batch(
230 &self,
231 kv_pairs: impl Iterator<Item = (BytesFullKey, Option<Bytes>)>,
232 ) -> StorageResult<()> {
233 let mut batch = sled::Batch::default();
234 for (key, value) in kv_pairs {
235 let encoded_key = key.encode_reverse_epoch();
236 let key = sled::IVec::from(encoded_key);
237 let mut buffer =
238 Vec::with_capacity(value.as_ref().map(|v| v.len()).unwrap_or_default() + 1);
239 if let Some(value) = value {
240 buffer.push(NON_EMPTY);
241 buffer.extend_from_slice(value.as_ref());
242 } else {
243 buffer.push(EMPTY);
244 }
245 let value = sled::IVec::from(buffer);
246 batch.insert(key, value);
247 }
248 self.inner.apply_batch(batch)?;
249 Ok(())
250 }
251
252 fn flush(&self) -> StorageResult<()> {
253 Ok(self.inner.flush().map(|_| {})?)
254 }
255 }
256
257 pub type SledStateStore = RangeKvStateStore<SledRangeKv>;
258
259 impl SledStateStore {
260 pub fn new(path: impl AsRef<std::path::Path>) -> Self {
261 RangeKvStateStore {
262 inner: SledRangeKv::new(path),
263 tables: Default::default(),
264 vectors: Default::default(),
265 }
266 }
267
268 pub fn new_temp() -> Self {
269 RangeKvStateStore {
270 inner: SledRangeKv::new_temp(),
271 tables: Default::default(),
272 vectors: Default::default(),
273 }
274 }
275 }
276
277 #[cfg(test)]
278 mod test {
279 use std::ops::{Bound, RangeBounds};
280
281 use bytes::Bytes;
282 use risingwave_common::catalog::TableId;
283 use risingwave_common::util::epoch::EPOCH_SPILL_TIME_MASK;
284 use risingwave_hummock_sdk::EpochWithGap;
285 use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey};
286
287 use crate::memory::RangeKv;
288 use crate::memory::sled::SledRangeKv;
289
290 #[test]
291 fn test_filter_variable_key_length_false_positive() {
292 let table_id = TableId { table_id: 233 };
293 let epoch = u64::MAX - u64::from_be_bytes([1, 2, 3, 4, 5, 6, 7, 8]);
294 let excluded_short_table_key = [0, 1, 0, 0];
295 let included_long_table_key = [0, 1, 0, 0, 1, 2];
296 let left_table_key = [0, 1, 0, 0, 1];
297 let right_table_key = [0, 1, 1, 1];
298
299 let to_full_key = |table_key: &[u8]| FullKey {
300 user_key: UserKey {
301 table_id,
302 table_key: TableKey(Bytes::from(table_key.to_vec())),
303 },
304 epoch_with_gap: EpochWithGap::new_from_epoch(epoch & !EPOCH_SPILL_TIME_MASK),
305 };
306
307 let left_full_key = to_full_key(&left_table_key[..]);
308 let right_full_key = to_full_key(&right_table_key[..]);
309 let included_long_full_key = to_full_key(&included_long_table_key[..]);
310 let excluded_short_full_key = to_full_key(&excluded_short_table_key[..]);
311
312 assert!(
313 (
314 Bound::Included(left_full_key.to_ref()),
315 Bound::Included(right_full_key.to_ref())
316 )
317 .contains(&included_long_full_key.to_ref())
318 );
319 assert!(
320 !(
321 Bound::Included(left_full_key.to_ref()),
322 Bound::Included(right_full_key.to_ref())
323 )
324 .contains(&excluded_short_full_key.to_ref())
325 );
326
327 let left_encoded = left_full_key.encode_reverse_epoch();
328 let right_encoded = right_full_key.encode_reverse_epoch();
329
330 assert!(
331 (
332 Bound::Included(left_encoded.clone()),
333 Bound::Included(right_encoded.clone())
334 )
335 .contains(&included_long_full_key.encode_reverse_epoch())
336 );
337 assert!(
338 (
339 Bound::Included(left_encoded),
340 Bound::Included(right_encoded)
341 )
342 .contains(&excluded_short_full_key.encode_reverse_epoch())
343 );
344
345 let sled_range_kv = SledRangeKv::new_temp();
346 sled_range_kv
347 .ingest_batch(
348 vec![
349 (included_long_full_key.clone(), None),
350 (excluded_short_full_key, None),
351 ]
352 .into_iter(),
353 )
354 .unwrap();
355 let kvs = sled_range_kv
356 .range(
357 (
358 Bound::Included(left_full_key),
359 Bound::Included(right_full_key),
360 ),
361 None,
362 )
363 .unwrap();
364 assert_eq!(1, kvs.len());
365 assert_eq!(included_long_full_key.to_ref(), kvs[0].0.to_ref());
366 assert!(kvs[0].1.is_none());
367 }
368 }
369}
370
371mod batched_iter {
372
373 use super::*;
374
375 pub struct Iter<R: RangeKv> {
382 inner: R,
383 range: BytesFullKeyRange,
384 current: std::vec::IntoIter<(FullKey<Bytes>, Option<Bytes>)>,
385 rev: bool,
386 }
387
388 impl<R: RangeKv> Iter<R> {
389 pub fn new(inner: R, range: BytesFullKeyRange, rev: bool) -> Self {
390 Self {
391 inner,
392 range,
393 rev,
394 current: Vec::new().into_iter(),
395 }
396 }
397 }
398
399 impl<R: RangeKv> Iter<R> {
400 const BATCH_SIZE: usize = 256;
401
402 fn refill(&mut self) -> StorageResult<()> {
404 assert!(self.current.is_empty());
405
406 let batch = if self.rev {
407 self.inner.rev_range(
408 (self.range.0.clone(), self.range.1.clone()),
409 Some(Self::BATCH_SIZE),
410 )?
411 } else {
412 self.inner.range(
413 (self.range.0.clone(), self.range.1.clone()),
414 Some(Self::BATCH_SIZE),
415 )?
416 };
417
418 if let Some((last_key, _)) = batch.last() {
419 let full_key = FullKey::new_with_gap_epoch(
420 last_key.user_key.table_id,
421 TableKey(last_key.user_key.table_key.0.clone()),
422 last_key.epoch_with_gap,
423 );
424 if self.rev {
425 self.range.1 = Bound::Excluded(full_key);
426 } else {
427 self.range.0 = Bound::Excluded(full_key);
428 }
429 }
430 self.current = batch.into_iter();
431 Ok(())
432 }
433 }
434
435 impl<R: RangeKv> Iter<R> {
436 #[allow(clippy::type_complexity)]
437 pub fn next(&mut self) -> StorageResult<Option<(BytesFullKey, Option<Bytes>)>> {
438 match self.current.next() {
439 Some((key, value)) => Ok(Some((key, value))),
440 None => {
441 self.refill()?;
442 Ok(self.current.next())
443 }
444 }
445 }
446 }
447
448 #[cfg(test)]
449 mod tests {
450 use rand::Rng;
451
452 use super::*;
453 use crate::memory::sled::SledRangeKv;
454
455 #[test]
456 fn test_btreemap_iter_chaos() {
457 let map = Arc::new(RwLock::new(BTreeMap::new()));
458 test_iter_chaos_inner(map, 1000);
459 }
460
461 #[cfg(not(madsim))]
462 #[test]
463 fn test_sled_iter_chaos() {
464 let map = SledRangeKv::new_temp();
465 test_iter_chaos_inner(map, 100);
466 }
467
468 fn test_iter_chaos_inner(map: impl RangeKv, count: usize) {
469 let key_range = 1..=10000;
470 let num_to_bytes = |k: i32| Bytes::from(format!("{:06}", k).as_bytes().to_vec());
471 let num_to_full_key =
472 |k: i32| FullKey::new(TableId::default(), TableKey(num_to_bytes(k)), 0);
473 #[allow(clippy::mutable_key_type)]
474 map.ingest_batch(key_range.clone().map(|k| {
475 let key = num_to_full_key(k);
476 let b = key.user_key.table_key.0.clone();
477
478 (key, Some(b))
479 }))
480 .unwrap();
481
482 let rand_bound = || {
483 let key = rand::rng().random_range(key_range.clone());
484 let key = num_to_full_key(key);
485 match rand::rng().random_range(1..=5) {
486 1 | 2 => Bound::Included(key),
487 3 | 4 => Bound::Excluded(key),
488 _ => Bound::Unbounded,
489 }
490 };
491
492 for _ in 0..count {
493 let range = loop {
494 let range = (rand_bound(), rand_bound());
495 let (start, end) = (range.start_bound(), range.end_bound());
496
497 match (start, end) {
499 (Bound::Excluded(s), Bound::Excluded(e)) if s == e => {
500 continue;
501 }
502 (
503 Bound::Included(s) | Bound::Excluded(s),
504 Bound::Included(e) | Bound::Excluded(e),
505 ) if s > e => {
506 continue;
507 }
508 _ => break range,
509 }
510 };
511
512 let v1 = {
513 let mut v = vec![];
514 let mut iter = Iter::new(map.clone(), range.clone(), false);
515 while let Some((key, value)) = iter.next().unwrap() {
516 v.push((key, value));
517 }
518 v
519 };
520 let v2 = map.range(range, None).unwrap();
521
522 assert_eq!(v1, v2);
524 }
525 }
526 }
527}
528
529pub type MemoryStateStore = RangeKvStateStore<BTreeMapRangeKv>;
530
531struct TableState {
532 init_epoch: u64,
533 next_epochs: BTreeMap<u64, u64>,
534 latest_sealed_epoch: Option<u64>,
535 sealing_epochs: BTreeMap<u64, BitmapBuilder>,
536}
537
538impl TableState {
539 fn new(init_epoch: u64) -> Self {
540 Self {
541 init_epoch,
542 next_epochs: Default::default(),
543 latest_sealed_epoch: None,
544 sealing_epochs: Default::default(),
545 }
546 }
547
548 async fn wait_epoch(
549 tables: &parking_lot::Mutex<HashMap<TableId, Self>>,
550 table_id: TableId,
551 epoch: u64,
552 ) {
553 loop {
554 {
555 let tables = tables.lock();
556 let table_state = tables.get(&table_id).expect("should exist");
557 assert!(epoch >= table_state.init_epoch);
558 if epoch == table_state.init_epoch {
559 return;
560 }
561 if let Some(latest_sealed_epoch) = table_state.latest_sealed_epoch
562 && latest_sealed_epoch >= epoch
563 {
564 return;
565 }
566 }
567 yield_now().await;
568 }
569 }
570}
571
572type InMemVectorStore = Arc<RwLock<HashMap<TableId, Vec<(Vector, Bytes, u64)>>>>;
573
574#[derive(Clone, Default)]
580pub struct RangeKvStateStore<R: RangeKv> {
581 inner: R,
583 tables: Arc<parking_lot::Mutex<HashMap<TableId, TableState>>>,
585
586 vectors: InMemVectorStore,
587}
588
589fn to_full_key_range<R, B>(table_id: TableId, table_key_range: R) -> BytesFullKeyRange
590where
591 R: RangeBounds<B> + Send,
592 B: AsRef<[u8]>,
593{
594 let start = match table_key_range.start_bound() {
595 Included(k) => Included(FullKey::new(
596 table_id,
597 TableKey(Bytes::from(k.as_ref().to_vec())),
598 HummockEpoch::MAX,
599 )),
600 Excluded(k) => Excluded(FullKey::new(
601 table_id,
602 TableKey(Bytes::from(k.as_ref().to_vec())),
603 0,
604 )),
605 Unbounded => Included(FullKey::new(
606 table_id,
607 TableKey(Bytes::from(b"".to_vec())),
608 HummockEpoch::MAX,
609 )),
610 };
611 let end = match table_key_range.end_bound() {
612 Included(k) => Included(FullKey::new(
613 table_id,
614 TableKey(Bytes::from(k.as_ref().to_vec())),
615 0,
616 )),
617 Excluded(k) => Excluded(FullKey::new(
618 table_id,
619 TableKey(Bytes::from(k.as_ref().to_vec())),
620 HummockEpoch::MAX,
621 )),
622 Unbounded => {
623 if let Some(next_table_id) = table_id.table_id().checked_add(1) {
624 Excluded(FullKey::new(
625 next_table_id.into(),
626 TableKey(Bytes::from(b"".to_vec())),
627 HummockEpoch::MAX,
628 ))
629 } else {
630 Unbounded
631 }
632 }
633 };
634 (start, end)
635}
636
637impl MemoryStateStore {
638 pub fn new() -> Self {
639 Self::default()
640 }
641
642 pub fn shared() -> Self {
643 static STORE: LazyLock<MemoryStateStore> = LazyLock::new(MemoryStateStore::new);
644 STORE.clone()
645 }
646}
647
648impl<R: RangeKv> RangeKvStateStore<R> {
649 fn scan(
650 &self,
651 key_range: TableKeyRange,
652 epoch: u64,
653 table_id: TableId,
654 limit: Option<usize>,
655 ) -> StorageResult<Vec<(Bytes, Bytes)>> {
656 let mut data = vec![];
657 if limit == Some(0) {
658 return Ok(vec![]);
659 }
660 let mut last_user_key = None;
661 for (key, value) in self
662 .inner
663 .range(to_full_key_range(table_id, key_range), None)?
664 {
665 if key.epoch_with_gap.pure_epoch() > epoch {
666 continue;
667 }
668 if Some(&key.user_key) != last_user_key.as_ref() {
669 if let Some(value) = value {
670 data.push((Bytes::from(key.encode()), value.clone()));
671 }
672 last_user_key = Some(key.user_key.clone());
673 }
674 if let Some(limit) = limit
675 && data.len() >= limit
676 {
677 break;
678 }
679 }
680 Ok(data)
681 }
682}
683
684#[derive(Clone)]
685pub struct RangeKvStateStoreReadSnapshot<R: RangeKv> {
686 inner: RangeKvStateStore<R>,
687 epoch: u64,
688 table_id: TableId,
689}
690
691impl<R: RangeKv> StateStoreGet for RangeKvStateStoreReadSnapshot<R> {
692 async fn on_key_value<'a, O: Send + 'a>(
693 &'a self,
694 key: TableKey<Bytes>,
695 _read_options: ReadOptions,
696 on_key_value_fn: impl KeyValueFn<'a, O>,
697 ) -> StorageResult<Option<O>> {
698 self.inner
699 .get_keyed_row_impl(key, self.epoch, self.table_id)
700 .and_then(|option| {
701 if let Some((key, value)) = option {
702 on_key_value_fn(key.to_ref(), value.as_ref()).map(Some)
703 } else {
704 Ok(None)
705 }
706 })
707 }
708}
709
710impl<R: RangeKv> StateStoreRead for RangeKvStateStoreReadSnapshot<R> {
711 type Iter = RangeKvStateStoreIter<R>;
712 type RevIter = RangeKvStateStoreRevIter<R>;
713
714 async fn iter(
715 &self,
716 key_range: TableKeyRange,
717 _read_options: ReadOptions,
718 ) -> StorageResult<Self::Iter> {
719 self.inner.iter_impl(key_range, self.epoch, self.table_id)
720 }
721
722 async fn rev_iter(
723 &self,
724 key_range: TableKeyRange,
725 _read_options: ReadOptions,
726 ) -> StorageResult<Self::RevIter> {
727 self.inner
728 .rev_iter_impl(key_range, self.epoch, self.table_id)
729 }
730}
731
732impl<R: RangeKv> StateStoreReadVector for RangeKvStateStoreReadSnapshot<R> {
733 async fn nearest<'a, O: Send + 'a>(
734 &'a self,
735 vec: VectorRef<'a>,
736 options: VectorNearestOptions,
737 on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
738 ) -> StorageResult<Vec<O>> {
739 fn nearest_impl<'a, M: MeasureDistanceBuilder, O>(
740 store: &'a InMemVectorStore,
741 epoch: u64,
742 table_id: TableId,
743 vec: VectorRef<'a>,
744 options: VectorNearestOptions,
745 on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
746 ) -> Vec<O> {
747 let mut builder = NearestBuilder::<'_, O, M>::new(vec, options.top_n);
748 builder.add(
749 store
750 .read()
751 .get(&table_id)
752 .map(|vec| vec.iter())
753 .into_iter()
754 .flatten()
755 .filter(|(_, _, vector_epoch)| epoch >= *vector_epoch)
756 .map(|(vec, info, _)| (vec.to_ref(), info.as_ref())),
757 on_nearest_item_fn,
758 );
759 builder.finish()
760 }
761 dispatch_distance_measurement!(options.measure, MeasurementType, {
762 Ok(nearest_impl::<MeasurementType, O>(
763 &self.inner.vectors,
764 self.epoch,
765 self.table_id,
766 vec,
767 options,
768 on_nearest_item_fn,
769 ))
770 })
771 }
772}
773
774impl<R: RangeKv> RangeKvStateStore<R> {
775 fn get_keyed_row_impl(
776 &self,
777 key: TableKey<Bytes>,
778 epoch: u64,
779 table_id: TableId,
780 ) -> StorageResult<Option<StateStoreKeyedRow>> {
781 let range_bounds = (Bound::Included(key.clone()), Bound::Included(key));
782 let res = self.scan(range_bounds, epoch, table_id, Some(1))?;
784
785 Ok(match res.as_slice() {
786 [] => None,
787 [(key, value)] => Some((
788 FullKey::decode(key.as_ref()).to_vec().into_bytes(),
789 value.clone(),
790 )),
791 _ => unreachable!(),
792 })
793 }
794
795 fn iter_impl(
796 &self,
797 key_range: TableKeyRange,
798 epoch: u64,
799 table_id: TableId,
800 ) -> StorageResult<RangeKvStateStoreIter<R>> {
801 Ok(RangeKvStateStoreIter::new(
802 batched_iter::Iter::new(
803 self.inner.clone(),
804 to_full_key_range(table_id, key_range),
805 false,
806 ),
807 epoch,
808 true,
809 ))
810 }
811
812 fn rev_iter_impl(
813 &self,
814 key_range: TableKeyRange,
815 epoch: u64,
816 table_id: TableId,
817 ) -> StorageResult<RangeKvStateStoreRevIter<R>> {
818 Ok(RangeKvStateStoreRevIter::new(
819 batched_iter::Iter::new(
820 self.inner.clone(),
821 to_full_key_range(table_id, key_range),
822 true,
823 ),
824 epoch,
825 true,
826 ))
827 }
828}
829
830impl<R: RangeKv> StateStoreReadLog for RangeKvStateStore<R> {
831 type ChangeLogIter = RangeKvStateStoreChangeLogIter<R>;
832
833 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
834 loop {
835 {
836 let tables = self.tables.lock();
837 let Some(tables) = tables.get(&options.table_id) else {
838 return Err(HummockError::next_epoch(format!(
839 "table {} not exist",
840 options.table_id
841 ))
842 .into());
843 };
844 if let Some(next_epoch) = tables.next_epochs.get(&epoch) {
845 break Ok(*next_epoch);
846 }
847 }
848 yield_now().await;
849 }
850 }
851
852 async fn iter_log(
853 &self,
854 (min_epoch, max_epoch): (u64, u64),
855 key_range: TableKeyRange,
856 options: ReadLogOptions,
857 ) -> StorageResult<Self::ChangeLogIter> {
858 let new_value_iter = RangeKvStateStoreIter::new(
859 batched_iter::Iter::new(
860 self.inner.clone(),
861 to_full_key_range(options.table_id, key_range.clone()),
862 false,
863 ),
864 max_epoch,
865 true,
866 );
867 let old_value_iter = RangeKvStateStoreIter::new(
868 batched_iter::Iter::new(
869 self.inner.clone(),
870 to_full_key_range(options.table_id, key_range),
871 false,
872 ),
873 min_epoch,
874 false,
875 );
876 RangeKvStateStoreChangeLogIter::new(new_value_iter, old_value_iter)
877 }
878}
879
880impl<R: RangeKv> RangeKvStateStore<R> {
881 fn new_read_snapshot_impl(
882 &self,
883 epoch: u64,
884 table_id: TableId,
885 ) -> RangeKvStateStoreReadSnapshot<R> {
886 RangeKvStateStoreReadSnapshot {
887 inner: self.clone(),
888 epoch,
889 table_id,
890 }
891 }
892
893 pub(crate) fn ingest_batch(
894 &self,
895 mut kv_pairs: Vec<(TableKey<Bytes>, StorageValue)>,
896 delete_ranges: Vec<(Bound<Bytes>, Bound<Bytes>)>,
897 epoch: u64,
898 table_id: TableId,
899 ) -> StorageResult<usize> {
900 let mut delete_keys = BTreeSet::new();
901 for del_range in delete_ranges {
902 for (key, _) in self.inner.range(
903 (
904 del_range
905 .0
906 .map(|table_key| FullKey::new(table_id, TableKey(table_key), epoch)),
907 del_range
908 .1
909 .map(|table_key| FullKey::new(table_id, TableKey(table_key), epoch)),
910 ),
911 None,
912 )? {
913 delete_keys.insert(key.user_key.table_key);
914 }
915 }
916 for key in delete_keys {
917 kv_pairs.push((key, StorageValue::new_delete()));
918 }
919
920 let mut size = 0;
921 self.inner
922 .ingest_batch(kv_pairs.into_iter().map(|(key, value)| {
923 size += key.len() + value.size();
924 (FullKey::new(table_id, key, epoch), value.user_value)
925 }))?;
926 Ok(size)
927 }
928
929 fn ingest_vectors(&self, table_id: TableId, epoch: u64, vecs: Vec<(Vector, Bytes)>) {
930 self.vectors
931 .write()
932 .entry(table_id)
933 .or_default()
934 .extend(vecs.into_iter().map(|(vec, info)| (vec, info, epoch)));
935 }
936}
937
938impl<R: RangeKv> StateStore for RangeKvStateStore<R> {
939 type Local = RangeKvLocalStateStore<R>;
940 type ReadSnapshot = RangeKvStateStoreReadSnapshot<R>;
941 type VectorWriter = RangeKvLocalStateStore<R>;
942
943 async fn try_wait_epoch(
944 &self,
945 _epoch: HummockReadEpoch,
946 _options: TryWaitEpochOptions,
947 ) -> StorageResult<()> {
948 Ok(())
950 }
951
952 async fn new_local(&self, option: NewLocalOptions) -> Self::Local {
953 RangeKvLocalStateStore::new(self.clone(), option)
954 }
955
956 async fn new_read_snapshot(
957 &self,
958 epoch: HummockReadEpoch,
959 options: NewReadSnapshotOptions,
960 ) -> StorageResult<Self::ReadSnapshot> {
961 Ok(self.new_read_snapshot_impl(epoch.get_epoch(), options.table_id))
962 }
963
964 async fn new_vector_writer(&self, options: NewVectorWriterOptions) -> Self::VectorWriter {
965 RangeKvLocalStateStore::new(
966 self.clone(),
967 NewLocalOptions {
968 table_id: options.table_id,
969 op_consistency_level: Default::default(),
970 table_option: Default::default(),
971 is_replicated: false,
972 vnodes: Arc::new(Bitmap::from_bool_slice(&[true])),
973 upload_on_flush: true,
974 },
975 )
976 }
977}
978
979pub struct RangeKvLocalStateStore<R: RangeKv> {
980 mem_table: MemTable,
981 vectors: Vec<(Vector, Bytes)>,
982 inner: RangeKvStateStore<R>,
983
984 epoch: Option<EpochPair>,
985
986 table_id: TableId,
987 op_consistency_level: OpConsistencyLevel,
988 table_option: TableOption,
989 vnodes: Arc<Bitmap>,
990}
991
992impl<R: RangeKv> RangeKvLocalStateStore<R> {
993 pub fn new(inner: RangeKvStateStore<R>, option: NewLocalOptions) -> Self {
994 Self {
995 inner,
996 mem_table: MemTable::new(option.table_id, option.op_consistency_level.clone()),
997 epoch: None,
998 table_id: option.table_id,
999 op_consistency_level: option.op_consistency_level,
1000 table_option: option.table_option,
1001 vnodes: option.vnodes,
1002 vectors: vec![],
1003 }
1004 }
1005
1006 fn epoch(&self) -> u64 {
1007 self.epoch.expect("should have set the epoch").curr
1008 }
1009}
1010
1011impl<R: RangeKv> StateStoreGet for RangeKvLocalStateStore<R> {
1012 async fn on_key_value<'a, O: Send + 'a>(
1013 &'a self,
1014 key: TableKey<Bytes>,
1015 _read_options: ReadOptions,
1016 on_key_value_fn: impl KeyValueFn<'a, O>,
1017 ) -> StorageResult<Option<O>> {
1018 if let Some((key, value)) = match self.mem_table.buffer.get(&key) {
1019 None => self
1020 .inner
1021 .get_keyed_row_impl(key, self.epoch(), self.table_id)?,
1022 Some(op) => match op {
1023 KeyOp::Insert(value) | KeyOp::Update((_, value)) => Some((
1024 FullKey::new(self.table_id, key, self.epoch()),
1025 value.clone(),
1026 )),
1027 KeyOp::Delete(_) => None,
1028 },
1029 } {
1030 Ok(Some(on_key_value_fn(key.to_ref(), value.as_ref())?))
1031 } else {
1032 Ok(None)
1033 }
1034 }
1035}
1036
1037impl<R: RangeKv> LocalStateStore for RangeKvLocalStateStore<R> {
1038 type FlushedSnapshotReader = RangeKvStateStoreReadSnapshot<R>;
1039
1040 type Iter<'a> = impl StateStoreIter + 'a;
1041 type RevIter<'a> = impl StateStoreIter + 'a;
1042
1043 async fn iter(
1044 &self,
1045 key_range: TableKeyRange,
1046 _read_options: ReadOptions,
1047 ) -> StorageResult<Self::Iter<'_>> {
1048 let iter = self
1049 .inner
1050 .iter_impl(key_range.clone(), self.epoch(), self.table_id)?;
1051 Ok(FromStreamStateStoreIter::new(Box::pin(merge_stream(
1052 self.mem_table.iter(key_range),
1053 iter.into_stream(to_owned_item),
1054 self.table_id,
1055 self.epoch(),
1056 false,
1057 ))))
1058 }
1059
1060 async fn rev_iter(
1061 &self,
1062 key_range: TableKeyRange,
1063 _read_options: ReadOptions,
1064 ) -> StorageResult<Self::RevIter<'_>> {
1065 let iter = self
1066 .inner
1067 .rev_iter_impl(key_range.clone(), self.epoch(), self.table_id)?;
1068 Ok(FromStreamStateStoreIter::new(Box::pin(merge_stream(
1069 self.mem_table.rev_iter(key_range),
1070 iter.into_stream(to_owned_item),
1071 self.table_id,
1072 self.epoch(),
1073 true,
1074 ))))
1075 }
1076
1077 fn insert(
1078 &mut self,
1079 key: TableKey<Bytes>,
1080 new_val: Bytes,
1081 old_val: Option<Bytes>,
1082 ) -> StorageResult<()> {
1083 match old_val {
1084 None => self.mem_table.insert(key, new_val)?,
1085 Some(old_val) => self.mem_table.update(key, old_val, new_val)?,
1086 };
1087 Ok(())
1088 }
1089
1090 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
1091 Ok(self.mem_table.delete(key, old_val)?)
1092 }
1093
1094 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
1095 if self.vnodes.len() > 1 {
1096 TableState::wait_epoch(
1097 &self.inner.tables,
1098 self.table_id,
1099 self.epoch.expect("should have init").prev,
1100 )
1101 .await;
1102 }
1103 Ok(std::mem::replace(&mut self.vnodes, vnodes))
1104 }
1105
1106 fn get_table_watermark(&self, _vnode: VirtualNode) -> Option<Bytes> {
1107 None
1109 }
1110
1111 fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
1112 self.inner.new_read_snapshot_impl(MAX_EPOCH, self.table_id)
1113 }
1114}
1115
1116impl<R: RangeKv> StateStoreWriteEpochControl for RangeKvLocalStateStore<R> {
1117 async fn flush(&mut self) -> StorageResult<usize> {
1118 let buffer = self.mem_table.drain().into_parts();
1119 let mut kv_pairs = Vec::with_capacity(buffer.len());
1120 let sanity_check_read_snapshot = if sanity_check_enabled() {
1121 Some(self.inner.new_read_snapshot_impl(MAX_EPOCH, self.table_id))
1122 } else {
1123 None
1124 };
1125 for (key, key_op) in buffer {
1126 match key_op {
1127 KeyOp::Insert(value) => {
1131 if let Some(sanity_check_read_snapshot) = &sanity_check_read_snapshot {
1132 do_insert_sanity_check(
1133 self.table_id,
1134 &key,
1135 &value,
1136 sanity_check_read_snapshot,
1137 self.table_option,
1138 &self.op_consistency_level,
1139 )
1140 .await?;
1141 }
1142 kv_pairs.push((key, StorageValue::new_put(value)));
1143 }
1144 KeyOp::Delete(old_value) => {
1145 if let Some(sanity_check_read_snapshot) = &sanity_check_read_snapshot {
1146 do_delete_sanity_check(
1147 self.table_id,
1148 &key,
1149 &old_value,
1150 sanity_check_read_snapshot,
1151 self.table_option,
1152 &self.op_consistency_level,
1153 )
1154 .await?;
1155 }
1156 kv_pairs.push((key, StorageValue::new_delete()));
1157 }
1158 KeyOp::Update((old_value, new_value)) => {
1159 if let Some(sanity_check_read_snapshot) = &sanity_check_read_snapshot {
1160 do_update_sanity_check(
1161 self.table_id,
1162 &key,
1163 &old_value,
1164 &new_value,
1165 sanity_check_read_snapshot,
1166 self.table_option,
1167 &self.op_consistency_level,
1168 )
1169 .await?;
1170 }
1171 kv_pairs.push((key, StorageValue::new_put(new_value)));
1172 }
1173 }
1174 }
1175 let epoch = self.epoch();
1176 self.inner
1177 .ingest_vectors(self.table_id, epoch, take(&mut self.vectors));
1178 self.inner
1179 .ingest_batch(kv_pairs, vec![], epoch, self.table_id)
1180 }
1181
1182 async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
1183 assert_eq!(
1184 self.epoch.replace(options.epoch),
1185 None,
1186 "epoch in local state store of table id {:?} is init for more than once",
1187 self.table_id
1188 );
1189 self.inner
1190 .tables
1191 .lock()
1192 .entry(self.table_id)
1193 .or_insert_with(|| TableState::new(options.epoch.prev))
1194 .next_epochs
1195 .insert(options.epoch.prev, options.epoch.curr);
1196 if self.vnodes.len() > 1 {
1197 TableState::wait_epoch(&self.inner.tables, self.table_id, options.epoch.prev).await;
1198 }
1199
1200 Ok(())
1201 }
1202
1203 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
1204 assert!(!self.mem_table.is_dirty());
1205 if let Some(value_checker) = opts.switch_op_consistency_level {
1206 self.mem_table.op_consistency_level.update(&value_checker);
1207 }
1208 let epoch = self
1209 .epoch
1210 .as_mut()
1211 .expect("should have init epoch before seal the first epoch");
1212 let prev_epoch = epoch.curr;
1213 epoch.prev = prev_epoch;
1214 epoch.curr = next_epoch;
1215 assert!(
1216 next_epoch > prev_epoch,
1217 "new epoch {} should be greater than current epoch: {}",
1218 next_epoch,
1219 prev_epoch
1220 );
1221
1222 let mut tables = self.inner.tables.lock();
1223 let table_state = tables
1224 .get_mut(&self.table_id)
1225 .expect("should be set when init");
1226
1227 table_state.next_epochs.insert(prev_epoch, next_epoch);
1228 if self.vnodes.len() > 1 {
1229 let sealing_epoch_vnodes = table_state
1230 .sealing_epochs
1231 .entry(prev_epoch)
1232 .or_insert_with(|| BitmapBuilder::zeroed(self.vnodes.len()));
1233 assert_eq!(self.vnodes.len(), sealing_epoch_vnodes.len());
1234 for vnode in self.vnodes.iter_ones() {
1235 assert!(!sealing_epoch_vnodes.is_set(vnode));
1236 sealing_epoch_vnodes.set(vnode, true);
1237 }
1238 if (0..self.vnodes.len()).all(|vnode| sealing_epoch_vnodes.is_set(vnode)) {
1239 let (all_sealed_epoch, _) =
1240 table_state.sealing_epochs.pop_first().expect("non-empty");
1241 assert_eq!(
1242 all_sealed_epoch, prev_epoch,
1243 "new all_sealed_epoch must be the current prev epoch"
1244 );
1245 if let Some(prev_latest_sealed_epoch) =
1246 table_state.latest_sealed_epoch.replace(prev_epoch)
1247 {
1248 assert!(prev_epoch > prev_latest_sealed_epoch);
1249 }
1250 }
1251 }
1252
1253 if let Some((direction, watermarks, _watermark_type)) = opts.table_watermarks {
1254 let delete_ranges = watermarks
1255 .iter()
1256 .flat_map(|vnode_watermark| {
1257 let inner_range = match direction {
1258 WatermarkDirection::Ascending => {
1259 (Unbounded, Excluded(vnode_watermark.watermark().clone()))
1260 }
1261 WatermarkDirection::Descending => {
1262 (Excluded(vnode_watermark.watermark().clone()), Unbounded)
1263 }
1264 };
1265 vnode_watermark
1266 .vnode_bitmap()
1267 .iter_vnodes()
1268 .map(move |vnode| {
1269 let (start, end) =
1270 prefixed_range_with_vnode(inner_range.clone(), vnode);
1271 (start.map(|key| key.0), end.map(|key| key.0))
1272 })
1273 })
1274 .collect_vec();
1275 if let Err(e) =
1276 self.inner
1277 .ingest_batch(Vec::new(), delete_ranges, self.epoch(), self.table_id)
1278 {
1279 error!(error = %e.as_report(), "failed to write delete ranges of table watermark");
1280 }
1281 }
1282 }
1283
1284 async fn try_flush(&mut self) -> StorageResult<()> {
1285 Ok(())
1286 }
1287}
1288
1289impl<R: RangeKv> StateStoreWriteVector for RangeKvLocalStateStore<R> {
1290 fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> StorageResult<()> {
1291 self.vectors.push((vec.to_owned_scalar(), info));
1292 Ok(())
1293 }
1294}
1295
1296pub struct RangeKvStateStoreIter<R: RangeKv> {
1297 inner: batched_iter::Iter<R>,
1298
1299 epoch: HummockEpoch,
1300 is_inclusive_epoch: bool,
1301
1302 last_key: Option<UserKey<Bytes>>,
1303
1304 item_buffer: Option<StateStoreKeyedRow>,
1305}
1306
1307impl<R: RangeKv> RangeKvStateStoreIter<R> {
1308 pub fn new(
1309 inner: batched_iter::Iter<R>,
1310 epoch: HummockEpoch,
1311 is_inclusive_epoch: bool,
1312 ) -> Self {
1313 Self {
1314 inner,
1315 epoch,
1316 is_inclusive_epoch,
1317 last_key: None,
1318 item_buffer: None,
1319 }
1320 }
1321}
1322
1323impl<R: RangeKv> StateStoreIter for RangeKvStateStoreIter<R> {
1324 async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
1325 self.next_inner()?;
1326 Ok(self
1327 .item_buffer
1328 .as_ref()
1329 .map(|(key, value)| (key.to_ref(), value.as_ref())))
1330 }
1331}
1332
1333impl<R: RangeKv> RangeKvStateStoreIter<R> {
1334 fn next_inner(&mut self) -> StorageResult<()> {
1335 self.item_buffer = None;
1336 while let Some((key, value)) = self.inner.next()? {
1337 let epoch = key.epoch_with_gap.pure_epoch();
1338 if epoch > self.epoch {
1339 continue;
1340 }
1341 if epoch == self.epoch && !self.is_inclusive_epoch {
1342 continue;
1343 }
1344 if Some(key.user_key.as_ref()) != self.last_key.as_ref().map(|key| key.as_ref()) {
1345 self.last_key = Some(key.user_key.clone());
1346 if let Some(value) = value {
1347 self.item_buffer = Some((key, value));
1348 break;
1349 }
1350 }
1351 }
1352 Ok(())
1353 }
1354}
1355
1356pub struct RangeKvStateStoreRevIter<R: RangeKv> {
1357 inner: batched_iter::Iter<R>,
1358
1359 epoch: HummockEpoch,
1360 is_inclusive_epoch: bool,
1361
1362 item_buffer: VecDeque<StateStoreKeyedRow>,
1363}
1364
1365impl<R: RangeKv> RangeKvStateStoreRevIter<R> {
1366 pub fn new(
1367 inner: batched_iter::Iter<R>,
1368 epoch: HummockEpoch,
1369 is_inclusive_epoch: bool,
1370 ) -> Self {
1371 Self {
1372 inner,
1373 epoch,
1374 is_inclusive_epoch,
1375 item_buffer: VecDeque::default(),
1376 }
1377 }
1378}
1379
1380impl<R: RangeKv> StateStoreIter for RangeKvStateStoreRevIter<R> {
1381 async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
1382 self.next_inner()?;
1383 Ok(self
1384 .item_buffer
1385 .back()
1386 .map(|(key, value)| (key.to_ref(), value.as_ref())))
1387 }
1388}
1389
1390impl<R: RangeKv> RangeKvStateStoreRevIter<R> {
1391 fn next_inner(&mut self) -> StorageResult<()> {
1392 self.item_buffer.pop_back();
1393 while let Some((key, value)) = self.inner.next()? {
1394 let epoch = key.epoch_with_gap.pure_epoch();
1395 if epoch > self.epoch {
1396 continue;
1397 }
1398 if epoch == self.epoch && !self.is_inclusive_epoch {
1399 continue;
1400 }
1401
1402 let v = match value {
1403 Some(v) => v,
1404 None => {
1405 if let Some(last_key) = self.item_buffer.front()
1406 && key.user_key.as_ref() == last_key.0.user_key.as_ref()
1407 {
1408 self.item_buffer.clear();
1409 }
1410 continue;
1411 }
1412 };
1413
1414 if let Some(last_key) = self.item_buffer.front() {
1415 if key.user_key.as_ref() != last_key.0.user_key.as_ref() {
1416 self.item_buffer.push_front((key, v));
1417 break;
1418 } else {
1419 self.item_buffer.pop_front();
1420 self.item_buffer.push_front((key, v));
1421 }
1422 } else {
1423 self.item_buffer.push_front((key, v));
1424 }
1425 }
1426 Ok(())
1427 }
1428}
1429
1430pub struct RangeKvStateStoreChangeLogIter<R: RangeKv> {
1431 new_value_iter: RangeKvStateStoreIter<R>,
1432 old_value_iter: RangeKvStateStoreIter<R>,
1433 item_buffer: Option<(TableKey<Bytes>, ChangeLogValue<Bytes>)>,
1434}
1435
1436impl<R: RangeKv> RangeKvStateStoreChangeLogIter<R> {
1437 fn new(
1438 mut new_value_iter: RangeKvStateStoreIter<R>,
1439 mut old_value_iter: RangeKvStateStoreIter<R>,
1440 ) -> StorageResult<Self> {
1441 new_value_iter.next_inner()?;
1442 old_value_iter.next_inner()?;
1443 Ok(Self {
1444 new_value_iter,
1445 old_value_iter,
1446 item_buffer: None,
1447 })
1448 }
1449}
1450
1451impl<R: RangeKv> StateStoreIter<StateStoreReadLogItem> for RangeKvStateStoreChangeLogIter<R> {
1452 async fn try_next(&mut self) -> StorageResult<Option<StateStoreReadLogItemRef<'_>>> {
1453 loop {
1454 match (
1455 &self.new_value_iter.item_buffer,
1456 &self.old_value_iter.item_buffer,
1457 ) {
1458 (None, None) => {
1459 self.item_buffer = None;
1460 break;
1461 }
1462 (Some((key, new_value)), None) => {
1463 self.item_buffer = Some((
1464 key.user_key.table_key.clone(),
1465 ChangeLogValue::Insert(new_value.clone()),
1466 ));
1467 self.new_value_iter.next_inner()?;
1468 }
1469 (None, Some((key, old_value))) => {
1470 self.item_buffer = Some((
1471 key.user_key.table_key.clone(),
1472 ChangeLogValue::Delete(old_value.clone()),
1473 ));
1474 self.old_value_iter.next_inner()?;
1475 }
1476 (Some((new_value_key, new_value)), Some((old_value_key, old_value))) => {
1477 match new_value_key.user_key.cmp(&old_value_key.user_key) {
1478 Ordering::Less => {
1479 self.item_buffer = Some((
1480 new_value_key.user_key.table_key.clone(),
1481 ChangeLogValue::Insert(new_value.clone()),
1482 ));
1483 self.new_value_iter.next_inner()?;
1484 }
1485 Ordering::Greater => {
1486 self.item_buffer = Some((
1487 old_value_key.user_key.table_key.clone(),
1488 ChangeLogValue::Delete(old_value.clone()),
1489 ));
1490 self.old_value_iter.next_inner()?;
1491 }
1492 Ordering::Equal => {
1493 if new_value == old_value {
1494 self.new_value_iter.next_inner()?;
1495 self.old_value_iter.next_inner()?;
1496 continue;
1497 }
1498 self.item_buffer = Some((
1499 new_value_key.user_key.table_key.clone(),
1500 ChangeLogValue::Update {
1501 new_value: new_value.clone(),
1502 old_value: old_value.clone(),
1503 },
1504 ));
1505 self.new_value_iter.next_inner()?;
1506 self.old_value_iter.next_inner()?;
1507 }
1508 }
1509 }
1510 };
1511 break;
1512 }
1513 Ok(self
1514 .item_buffer
1515 .as_ref()
1516 .map(|(key, value)| (key.to_ref(), value.to_ref())))
1517 }
1518}
1519
1520#[cfg(test)]
1521mod tests {
1522 use risingwave_common::util::epoch::test_epoch;
1523
1524 use super::*;
1525 use crate::hummock::iterator::test_utils::{
1526 iterator_test_table_key_of, iterator_test_value_of,
1527 };
1528 use crate::hummock::test_utils::{ReadOptions, *};
1529 use crate::memory::sled::SledStateStore;
1530
1531 #[tokio::test]
1532 async fn test_snapshot_isolation_memory() {
1533 let state_store = MemoryStateStore::new();
1534 test_snapshot_isolation_inner(state_store).await;
1535 }
1536
1537 #[cfg(not(madsim))]
1538 #[tokio::test]
1539 async fn test_snapshot_isolation_sled() {
1540 let state_store = SledStateStore::new_temp();
1541 test_snapshot_isolation_inner(state_store).await;
1542 }
1543
1544 async fn test_snapshot_isolation_inner(state_store: RangeKvStateStore<impl RangeKv>) {
1545 state_store
1546 .ingest_batch(
1547 vec![
1548 (
1549 TableKey(Bytes::from(b"a".to_vec())),
1550 StorageValue::new_put(b"v1".to_vec()),
1551 ),
1552 (
1553 TableKey(Bytes::from(b"b".to_vec())),
1554 StorageValue::new_put(b"v1".to_vec()),
1555 ),
1556 ],
1557 vec![],
1558 0,
1559 Default::default(),
1560 )
1561 .unwrap();
1562 state_store
1563 .ingest_batch(
1564 vec![
1565 (
1566 TableKey(Bytes::from(b"a".to_vec())),
1567 StorageValue::new_put(b"v2".to_vec()),
1568 ),
1569 (
1570 TableKey(Bytes::from(b"b".to_vec())),
1571 StorageValue::new_delete(),
1572 ),
1573 ],
1574 vec![],
1575 test_epoch(1),
1576 Default::default(),
1577 )
1578 .unwrap();
1579 assert_eq!(
1580 state_store
1581 .scan(
1582 (
1583 Bound::Included(TableKey(Bytes::from("a"))),
1584 Bound::Included(TableKey(Bytes::from("b"))),
1585 ),
1586 0,
1587 TableId::default(),
1588 None,
1589 )
1590 .unwrap(),
1591 vec![
1592 (
1593 FullKey::for_test(Default::default(), Bytes::from("a"), 0)
1594 .encode()
1595 .into(),
1596 b"v1".to_vec().into()
1597 ),
1598 (
1599 FullKey::for_test(Default::default(), Bytes::from("b"), 0)
1600 .encode()
1601 .into(),
1602 b"v1".to_vec().into()
1603 )
1604 ]
1605 );
1606 assert_eq!(
1607 state_store
1608 .scan(
1609 (
1610 Bound::Included(TableKey(Bytes::from("a"))),
1611 Bound::Included(TableKey(Bytes::from("b"))),
1612 ),
1613 0,
1614 TableId::default(),
1615 Some(1),
1616 )
1617 .unwrap(),
1618 vec![(
1619 FullKey::for_test(Default::default(), b"a".to_vec(), 0)
1620 .encode()
1621 .into(),
1622 b"v1".to_vec().into()
1623 )]
1624 );
1625 assert_eq!(
1626 state_store
1627 .scan(
1628 (
1629 Bound::Included(TableKey(Bytes::from("a"))),
1630 Bound::Included(TableKey(Bytes::from("b"))),
1631 ),
1632 test_epoch(1),
1633 TableId::default(),
1634 None,
1635 )
1636 .unwrap(),
1637 vec![(
1638 FullKey::for_test(Default::default(), b"a".to_vec(), test_epoch(1))
1639 .encode()
1640 .into(),
1641 b"v2".to_vec().into()
1642 )]
1643 );
1644 assert_eq!(
1645 state_store
1646 .get(TableKey(Bytes::from("a")), 0, ReadOptions::default())
1647 .await
1648 .unwrap(),
1649 Some(Bytes::from("v1"))
1650 );
1651 assert_eq!(
1652 state_store
1653 .get(
1654 TableKey(Bytes::copy_from_slice(b"b")),
1655 0,
1656 ReadOptions::default(),
1657 )
1658 .await
1659 .unwrap(),
1660 Some(b"v1".to_vec().into())
1661 );
1662 assert_eq!(
1663 state_store
1664 .get(
1665 TableKey(Bytes::copy_from_slice(b"c")),
1666 0,
1667 ReadOptions::default(),
1668 )
1669 .await
1670 .unwrap(),
1671 None
1672 );
1673 assert_eq!(
1674 state_store
1675 .get(
1676 TableKey(Bytes::copy_from_slice(b"a")),
1677 test_epoch(1),
1678 ReadOptions::default(),
1679 )
1680 .await
1681 .unwrap(),
1682 Some(b"v2".to_vec().into())
1683 );
1684 assert_eq!(
1685 state_store
1686 .get(
1687 TableKey(Bytes::from("b")),
1688 test_epoch(1),
1689 ReadOptions::default(),
1690 )
1691 .await
1692 .unwrap(),
1693 None
1694 );
1695 assert_eq!(
1696 state_store
1697 .get(
1698 TableKey(Bytes::from("c")),
1699 test_epoch(1),
1700 ReadOptions::default()
1701 )
1702 .await
1703 .unwrap(),
1704 None
1705 );
1706 }
1707
1708 #[tokio::test]
1709 async fn test_iter_log_memory() {
1710 let state_store = MemoryStateStore::new();
1711 test_iter_log_inner(state_store).await;
1712 }
1713
1714 #[cfg(not(madsim))]
1715 #[tokio::test]
1716 async fn test_iter_log_sled() {
1717 let state_store = SledStateStore::new_temp();
1718 test_iter_log_inner(state_store).await;
1719 }
1720
1721 async fn test_iter_log_inner(state_store: RangeKvStateStore<impl RangeKv>) {
1722 let table_id = TableId::new(233);
1723 let epoch1 = test_epoch(1);
1724 let key_idx = [1, 2, 4];
1725 let make_key = |i| TableKey(Bytes::from(iterator_test_table_key_of(i)));
1726 let make_value = |i| Bytes::from(iterator_test_value_of(i));
1727 state_store
1728 .ingest_batch(
1729 key_idx
1730 .iter()
1731 .map(|i| (make_key(*i), StorageValue::new_put(make_value(*i))))
1732 .collect(),
1733 vec![],
1734 epoch1,
1735 table_id,
1736 )
1737 .unwrap();
1738 {
1739 let mut iter = state_store
1740 .iter_log(
1741 (epoch1, epoch1),
1742 (Unbounded, Unbounded),
1743 ReadLogOptions { table_id },
1744 )
1745 .await
1746 .unwrap();
1747 for i in key_idx {
1748 let (iter_key, change_value) = iter.try_next().await.unwrap().unwrap();
1749 assert_eq!(make_key(i).to_ref(), iter_key);
1750 assert_eq!(change_value, ChangeLogValue::Insert(make_value(i).as_ref()));
1751 }
1752 assert!(iter.try_next().await.unwrap().is_none());
1753 }
1754
1755 let epoch2 = test_epoch(2);
1756 state_store
1757 .ingest_batch(
1758 vec![
1759 (make_key(1), StorageValue::new_put(make_value(12))), (make_key(2), StorageValue::new_delete()), (make_key(3), StorageValue::new_put(make_value(3))),
1762 ],
1763 vec![],
1764 epoch2,
1765 table_id,
1766 )
1767 .unwrap();
1768
1769 {
1771 let expected = vec![
1772 (
1773 make_key(1),
1774 ChangeLogValue::Update {
1775 new_value: make_value(12),
1776 old_value: make_value(1),
1777 },
1778 ),
1779 (make_key(2), ChangeLogValue::Delete(make_value(2))),
1780 (make_key(3), ChangeLogValue::Insert(make_value(3))),
1781 ];
1782 let mut iter = state_store
1783 .iter_log(
1784 (epoch2, epoch2),
1785 (Unbounded, Unbounded),
1786 ReadLogOptions { table_id },
1787 )
1788 .await
1789 .unwrap();
1790 for (key, change_log_value) in expected {
1791 let (iter_key, iter_value) = iter.try_next().await.unwrap().unwrap();
1792 assert_eq!(
1793 key.to_ref(),
1794 iter_key,
1795 "{:?} {:?}",
1796 change_log_value.to_ref(),
1797 iter_value
1798 );
1799 assert_eq!(change_log_value.to_ref(), iter_value);
1800 }
1801 assert!(iter.try_next().await.unwrap().is_none());
1802 }
1803 {
1805 let mut iter = state_store
1806 .iter_log(
1807 (epoch1, epoch1),
1808 (Unbounded, Unbounded),
1809 ReadLogOptions { table_id },
1810 )
1811 .await
1812 .unwrap();
1813 for i in key_idx {
1814 let (iter_key, change_value) = iter.try_next().await.unwrap().unwrap();
1815 assert_eq!(make_key(i).to_ref(), iter_key);
1816 assert_eq!(change_value, ChangeLogValue::Insert(make_value(i).as_ref()));
1817 }
1818 assert!(iter.try_next().await.unwrap().is_none());
1819 }
1820 {
1822 let mut iter = state_store
1823 .iter_log(
1824 (epoch1, epoch2),
1825 (Unbounded, Unbounded),
1826 ReadLogOptions { table_id },
1827 )
1828 .await
1829 .unwrap();
1830 let (iter_key, change_value) = iter.try_next().await.unwrap().unwrap();
1831 assert_eq!(make_key(1).to_ref(), iter_key);
1832 assert_eq!(
1833 change_value,
1834 ChangeLogValue::Insert(make_value(12).as_ref())
1835 );
1836 for i in [3, 4] {
1837 let (iter_key, change_value) = iter.try_next().await.unwrap().unwrap();
1838 assert_eq!(make_key(i).to_ref(), iter_key);
1839 assert_eq!(change_value, ChangeLogValue::Insert(make_value(i).as_ref()));
1840 }
1841 assert!(iter.try_next().await.unwrap().is_none());
1842 }
1843 }
1844}