1use std::cmp::Ordering;
16use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
17use std::mem::take;
18use std::ops::Bound::{Excluded, Included, Unbounded};
19use std::ops::{Bound, RangeBounds};
20use std::sync::{Arc, LazyLock};
21
22use bytes::Bytes;
23use itertools::Itertools;
24use parking_lot::RwLock;
25use risingwave_common::array::VectorRef;
26use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
27use risingwave_common::catalog::TableId;
28use risingwave_common::dispatch_distance_measurement;
29use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
30use risingwave_common::id::FragmentId;
31use risingwave_common::types::ScalarRef;
32use risingwave_common::util::epoch::{EpochPair, MAX_EPOCH};
33use risingwave_hummock_sdk::key::{
34 FullKey, TableKey, TableKeyRange, UserKey, prefixed_range_with_vnode,
35};
36use risingwave_hummock_sdk::table_watermark::WatermarkDirection;
37use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch};
38use thiserror_ext::AsReport;
39use tokio::task::yield_now;
40use tracing::error;
41
42use crate::error::StorageResult;
43use crate::hummock::HummockError;
44use crate::hummock::utils::{
45 do_delete_sanity_check, do_insert_sanity_check, do_update_sanity_check, merge_stream,
46 sanity_check_enabled,
47};
48use crate::mem_table::{KeyOp, MemTable};
49use crate::storage_value::StorageValue;
50use crate::store::*;
51use crate::vector::{MeasureDistanceBuilder, NearestBuilder};
52
53pub type BytesFullKey = FullKey<Bytes>;
54pub type BytesFullKeyRange = (Bound<BytesFullKey>, Bound<BytesFullKey>);
55
56pub trait RangeKv: Clone + Send + Sync + 'static {
57 fn range(
58 &self,
59 range: BytesFullKeyRange,
60 limit: Option<usize>,
61 ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>>;
62
63 fn rev_range(
64 &self,
65 range: BytesFullKeyRange,
66 limit: Option<usize>,
67 ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>>;
68
69 fn ingest_batch(
70 &self,
71 kv_pairs: impl Iterator<Item = (BytesFullKey, Option<Bytes>)>,
72 ) -> StorageResult<()>;
73
74 fn flush(&self) -> StorageResult<()>;
75}
76
77pub type BTreeMapRangeKv = Arc<RwLock<BTreeMap<BytesFullKey, Option<Bytes>>>>;
78
79impl RangeKv for BTreeMapRangeKv {
80 fn range(
81 &self,
82 range: BytesFullKeyRange,
83 limit: Option<usize>,
84 ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>> {
85 let limit = limit.unwrap_or(usize::MAX);
86 Ok(self
87 .read()
88 .range(range)
89 .take(limit)
90 .map(|(key, value)| (key.clone(), value.clone()))
91 .collect())
92 }
93
94 fn rev_range(
95 &self,
96 range: BytesFullKeyRange,
97 limit: Option<usize>,
98 ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>> {
99 let limit = limit.unwrap_or(usize::MAX);
100 Ok(self
101 .read()
102 .range(range)
103 .rev()
104 .take(limit)
105 .map(|(key, value)| (key.clone(), value.clone()))
106 .collect())
107 }
108
109 fn ingest_batch(
110 &self,
111 kv_pairs: impl Iterator<Item = (BytesFullKey, Option<Bytes>)>,
112 ) -> StorageResult<()> {
113 let mut inner = self.write();
114 for (key, value) in kv_pairs {
115 inner.insert(key, value);
116 }
117 Ok(())
118 }
119
120 fn flush(&self) -> StorageResult<()> {
121 Ok(())
122 }
123}
124
125pub mod sled {
126 use std::fs::create_dir_all;
127 use std::ops::RangeBounds;
128
129 use bytes::Bytes;
130 use risingwave_hummock_sdk::key::FullKey;
131
132 use crate::error::StorageResult;
133 use crate::memory::{BytesFullKey, BytesFullKeyRange, RangeKv, RangeKvStateStore};
134
135 #[derive(Clone)]
136 pub struct SledRangeKv {
137 inner: sled::Db,
138 }
139
140 impl SledRangeKv {
141 pub fn new(path: impl AsRef<std::path::Path>) -> Self {
142 SledRangeKv {
143 inner: sled::open(path).expect("open"),
144 }
145 }
146
147 pub fn new_temp() -> Self {
148 create_dir_all("./.risingwave/sled").expect("should create");
149 let path = tempfile::TempDir::new_in("./.risingwave/sled")
150 .expect("find temp dir")
151 .keep();
152 Self::new(path)
153 }
154 }
155
156 const EMPTY: u8 = 1;
157 const NON_EMPTY: u8 = 0;
158
159 impl RangeKv for SledRangeKv {
160 fn range(
161 &self,
162 range: BytesFullKeyRange,
163 limit: Option<usize>,
164 ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>> {
165 let (left, right) = range;
166 let full_key_ref_bound = (
167 left.as_ref().map(FullKey::to_ref),
168 right.as_ref().map(FullKey::to_ref),
169 );
170 let left_encoded = left.as_ref().map(|key| key.to_ref().encode_reverse_epoch());
171 let right_encoded = right
172 .as_ref()
173 .map(|key| key.to_ref().encode_reverse_epoch());
174 let limit = limit.unwrap_or(usize::MAX);
175 let mut ret = vec![];
176 for result in self.inner.range((left_encoded, right_encoded)).take(limit) {
177 let (key, value) = result?;
178 let full_key = FullKey::decode_reverse_epoch(key.as_ref()).copy_into();
179 if !full_key_ref_bound.contains(&full_key.to_ref()) {
180 continue;
181 }
182 let value = match value.as_ref() {
183 [EMPTY] => None,
184 [NON_EMPTY, rest @ ..] => Some(Bytes::from(Vec::from(rest))),
185 _ => unreachable!("malformed value: {:?}", value),
186 };
187 ret.push((full_key, value))
188 }
189 Ok(ret)
190 }
191
192 fn rev_range(
193 &self,
194 range: BytesFullKeyRange,
195 limit: Option<usize>,
196 ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>> {
197 let (left, right) = range;
198 let full_key_ref_bound = (
199 left.as_ref().map(FullKey::to_ref),
200 right.as_ref().map(FullKey::to_ref),
201 );
202 let left_encoded = left.as_ref().map(|key| key.to_ref().encode_reverse_epoch());
203 let right_encoded = right
204 .as_ref()
205 .map(|key| key.to_ref().encode_reverse_epoch());
206 let limit = limit.unwrap_or(usize::MAX);
207 let mut ret = vec![];
208 for result in self
209 .inner
210 .range((left_encoded, right_encoded))
211 .rev()
212 .take(limit)
213 {
214 let (key, value) = result?;
215 let full_key = FullKey::decode_reverse_epoch(key.as_ref()).copy_into();
216 if !full_key_ref_bound.contains(&full_key.to_ref()) {
217 continue;
218 }
219 let value = match value.as_ref() {
220 [EMPTY] => None,
221 [NON_EMPTY, rest @ ..] => Some(Bytes::from(Vec::from(rest))),
222 _ => unreachable!("malformed value: {:?}", value),
223 };
224 ret.push((full_key, value))
225 }
226 Ok(ret)
227 }
228
229 fn ingest_batch(
230 &self,
231 kv_pairs: impl Iterator<Item = (BytesFullKey, Option<Bytes>)>,
232 ) -> StorageResult<()> {
233 let mut batch = sled::Batch::default();
234 for (key, value) in kv_pairs {
235 let encoded_key = key.encode_reverse_epoch();
236 let key = sled::IVec::from(encoded_key);
237 let mut buffer =
238 Vec::with_capacity(value.as_ref().map(|v| v.len()).unwrap_or_default() + 1);
239 if let Some(value) = value {
240 buffer.push(NON_EMPTY);
241 buffer.extend_from_slice(value.as_ref());
242 } else {
243 buffer.push(EMPTY);
244 }
245 let value = sled::IVec::from(buffer);
246 batch.insert(key, value);
247 }
248 self.inner.apply_batch(batch)?;
249 Ok(())
250 }
251
252 fn flush(&self) -> StorageResult<()> {
253 Ok(self.inner.flush().map(|_| {})?)
254 }
255 }
256
257 pub type SledStateStore = RangeKvStateStore<SledRangeKv>;
258
259 impl SledStateStore {
260 pub fn new(path: impl AsRef<std::path::Path>) -> Self {
261 RangeKvStateStore {
262 inner: SledRangeKv::new(path),
263 tables: Default::default(),
264 vectors: Default::default(),
265 }
266 }
267
268 pub fn new_temp() -> Self {
269 RangeKvStateStore {
270 inner: SledRangeKv::new_temp(),
271 tables: Default::default(),
272 vectors: Default::default(),
273 }
274 }
275 }
276
277 #[cfg(test)]
278 mod test {
279 use std::ops::{Bound, RangeBounds};
280
281 use bytes::Bytes;
282 use risingwave_common::catalog::TableId;
283 use risingwave_common::util::epoch::EPOCH_SPILL_TIME_MASK;
284 use risingwave_hummock_sdk::EpochWithGap;
285 use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey};
286
287 use crate::memory::RangeKv;
288 use crate::memory::sled::SledRangeKv;
289
290 #[test]
291 fn test_filter_variable_key_length_false_positive() {
292 let table_id = TableId::new(233);
293 let epoch = u64::MAX - u64::from_be_bytes([1, 2, 3, 4, 5, 6, 7, 8]);
294 let excluded_short_table_key = [0, 1, 0, 0];
295 let included_long_table_key = [0, 1, 0, 0, 1, 2];
296 let left_table_key = [0, 1, 0, 0, 1];
297 let right_table_key = [0, 1, 1, 1];
298
299 let to_full_key = |table_key: &[u8]| FullKey {
300 user_key: UserKey {
301 table_id,
302 table_key: TableKey(Bytes::from(table_key.to_vec())),
303 },
304 epoch_with_gap: EpochWithGap::new_from_epoch(epoch & !EPOCH_SPILL_TIME_MASK),
305 };
306
307 let left_full_key = to_full_key(&left_table_key[..]);
308 let right_full_key = to_full_key(&right_table_key[..]);
309 let included_long_full_key = to_full_key(&included_long_table_key[..]);
310 let excluded_short_full_key = to_full_key(&excluded_short_table_key[..]);
311
312 assert!(
313 (
314 Bound::Included(left_full_key.to_ref()),
315 Bound::Included(right_full_key.to_ref())
316 )
317 .contains(&included_long_full_key.to_ref())
318 );
319 assert!(
320 !(
321 Bound::Included(left_full_key.to_ref()),
322 Bound::Included(right_full_key.to_ref())
323 )
324 .contains(&excluded_short_full_key.to_ref())
325 );
326
327 let left_encoded = left_full_key.encode_reverse_epoch();
328 let right_encoded = right_full_key.encode_reverse_epoch();
329
330 assert!(
331 (
332 Bound::Included(left_encoded.clone()),
333 Bound::Included(right_encoded.clone())
334 )
335 .contains(&included_long_full_key.encode_reverse_epoch())
336 );
337 assert!(
338 (
339 Bound::Included(left_encoded),
340 Bound::Included(right_encoded)
341 )
342 .contains(&excluded_short_full_key.encode_reverse_epoch())
343 );
344
345 let sled_range_kv = SledRangeKv::new_temp();
346 sled_range_kv
347 .ingest_batch(
348 vec![
349 (included_long_full_key.clone(), None),
350 (excluded_short_full_key, None),
351 ]
352 .into_iter(),
353 )
354 .unwrap();
355 let kvs = sled_range_kv
356 .range(
357 (
358 Bound::Included(left_full_key),
359 Bound::Included(right_full_key),
360 ),
361 None,
362 )
363 .unwrap();
364 assert_eq!(1, kvs.len());
365 assert_eq!(included_long_full_key.to_ref(), kvs[0].0.to_ref());
366 assert!(kvs[0].1.is_none());
367 }
368 }
369}
370
371mod batched_iter {
372
373 use super::*;
374
375 pub struct Iter<R: RangeKv> {
382 inner: R,
383 range: BytesFullKeyRange,
384 current: std::vec::IntoIter<(FullKey<Bytes>, Option<Bytes>)>,
385 rev: bool,
386 }
387
388 impl<R: RangeKv> Iter<R> {
389 pub fn new(inner: R, range: BytesFullKeyRange, rev: bool) -> Self {
390 Self {
391 inner,
392 range,
393 rev,
394 current: Vec::new().into_iter(),
395 }
396 }
397 }
398
399 impl<R: RangeKv> Iter<R> {
400 const BATCH_SIZE: usize = 256;
401
402 fn refill(&mut self) -> StorageResult<()> {
404 assert!(self.current.is_empty());
405
406 let batch = if self.rev {
407 self.inner.rev_range(
408 (self.range.0.clone(), self.range.1.clone()),
409 Some(Self::BATCH_SIZE),
410 )?
411 } else {
412 self.inner.range(
413 (self.range.0.clone(), self.range.1.clone()),
414 Some(Self::BATCH_SIZE),
415 )?
416 };
417
418 if let Some((last_key, _)) = batch.last() {
419 let full_key = FullKey::new_with_gap_epoch(
420 last_key.user_key.table_id,
421 TableKey(last_key.user_key.table_key.0.clone()),
422 last_key.epoch_with_gap,
423 );
424 if self.rev {
425 self.range.1 = Bound::Excluded(full_key);
426 } else {
427 self.range.0 = Bound::Excluded(full_key);
428 }
429 }
430 self.current = batch.into_iter();
431 Ok(())
432 }
433 }
434
435 impl<R: RangeKv> Iter<R> {
436 pub fn next(&mut self) -> StorageResult<Option<(BytesFullKey, Option<Bytes>)>> {
437 match self.current.next() {
438 Some((key, value)) => Ok(Some((key, value))),
439 None => {
440 self.refill()?;
441 Ok(self.current.next())
442 }
443 }
444 }
445 }
446
447 #[cfg(test)]
448 mod tests {
449 use rand::Rng;
450
451 use super::*;
452 use crate::memory::sled::SledRangeKv;
453
454 #[test]
455 fn test_btreemap_iter_chaos() {
456 let map = Arc::new(RwLock::new(BTreeMap::new()));
457 test_iter_chaos_inner(map, 1000);
458 }
459
460 #[cfg(not(madsim))]
461 #[test]
462 fn test_sled_iter_chaos() {
463 let map = SledRangeKv::new_temp();
464 test_iter_chaos_inner(map, 100);
465 }
466
467 fn test_iter_chaos_inner(map: impl RangeKv, count: usize) {
468 let key_range = 1..=10000;
469 let num_to_bytes = |k: i32| Bytes::from(format!("{:06}", k).as_bytes().to_vec());
470 let num_to_full_key =
471 |k: i32| FullKey::new(TableId::default(), TableKey(num_to_bytes(k)), 0);
472
473 map.ingest_batch(key_range.clone().map(|k| {
474 let key = num_to_full_key(k);
475 let b = key.user_key.table_key.0.clone();
476
477 (key, Some(b))
478 }))
479 .unwrap();
480
481 let rand_bound = || {
482 let key = rand::rng().random_range(key_range.clone());
483 let key = num_to_full_key(key);
484 match rand::rng().random_range(1..=5) {
485 1 | 2 => Bound::Included(key),
486 3 | 4 => Bound::Excluded(key),
487 _ => Bound::Unbounded,
488 }
489 };
490
491 for _ in 0..count {
492 let range = loop {
493 let range = (rand_bound(), rand_bound());
494 let (start, end) = (range.start_bound(), range.end_bound());
495
496 match (start, end) {
498 (Bound::Excluded(s), Bound::Excluded(e)) if s == e => {
499 continue;
500 }
501 (
502 Bound::Included(s) | Bound::Excluded(s),
503 Bound::Included(e) | Bound::Excluded(e),
504 ) if s > e => {
505 continue;
506 }
507 _ => break range,
508 }
509 };
510
511 let v1 = {
512 let mut v = vec![];
513 let mut iter = Iter::new(map.clone(), range.clone(), false);
514 while let Some((key, value)) = iter.next().unwrap() {
515 v.push((key, value));
516 }
517 v
518 };
519 let v2 = map.range(range, None).unwrap();
520
521 assert_eq!(v1, v2);
523 }
524 }
525 }
526}
527
528pub type MemoryStateStore = RangeKvStateStore<BTreeMapRangeKv>;
529
530struct TableState {
531 init_epoch: u64,
532 next_epochs: BTreeMap<u64, u64>,
533 latest_sealed_epoch: Option<u64>,
534 sealing_epochs: BTreeMap<u64, BitmapBuilder>,
535}
536
537impl TableState {
538 fn new(init_epoch: u64) -> Self {
539 Self {
540 init_epoch,
541 next_epochs: Default::default(),
542 latest_sealed_epoch: None,
543 sealing_epochs: Default::default(),
544 }
545 }
546
547 async fn wait_epoch(
548 tables: &parking_lot::Mutex<HashMap<TableId, Self>>,
549 table_id: TableId,
550 epoch: u64,
551 ) {
552 loop {
553 {
554 let tables = tables.lock();
555 let table_state = tables.get(&table_id).expect("should exist");
556 assert!(epoch >= table_state.init_epoch);
557 if epoch == table_state.init_epoch {
558 return;
559 }
560 if let Some(latest_sealed_epoch) = table_state.latest_sealed_epoch
561 && latest_sealed_epoch >= epoch
562 {
563 return;
564 }
565 }
566 yield_now().await;
567 }
568 }
569}
570
571type InMemVectorStore = Arc<RwLock<HashMap<TableId, Vec<(Vector, Bytes, u64)>>>>;
572
573#[derive(Clone, Default)]
579pub struct RangeKvStateStore<R: RangeKv> {
580 inner: R,
582 tables: Arc<parking_lot::Mutex<HashMap<TableId, TableState>>>,
584
585 vectors: InMemVectorStore,
586}
587
588fn to_full_key_range<R, B>(table_id: TableId, table_key_range: R) -> BytesFullKeyRange
589where
590 R: RangeBounds<B> + Send,
591 B: AsRef<[u8]>,
592{
593 let start = match table_key_range.start_bound() {
594 Included(k) => Included(FullKey::new(
595 table_id,
596 TableKey(Bytes::from(k.as_ref().to_vec())),
597 HummockEpoch::MAX,
598 )),
599 Excluded(k) => Excluded(FullKey::new(
600 table_id,
601 TableKey(Bytes::from(k.as_ref().to_vec())),
602 0,
603 )),
604 Unbounded => Included(FullKey::new(
605 table_id,
606 TableKey(Bytes::from(b"".to_vec())),
607 HummockEpoch::MAX,
608 )),
609 };
610 let end = match table_key_range.end_bound() {
611 Included(k) => Included(FullKey::new(
612 table_id,
613 TableKey(Bytes::from(k.as_ref().to_vec())),
614 0,
615 )),
616 Excluded(k) => Excluded(FullKey::new(
617 table_id,
618 TableKey(Bytes::from(k.as_ref().to_vec())),
619 HummockEpoch::MAX,
620 )),
621 Unbounded => {
622 if let Some(next_table_id) = table_id.as_raw_id().checked_add(1) {
623 Excluded(FullKey::new(
624 next_table_id.into(),
625 TableKey(Bytes::from(b"".to_vec())),
626 HummockEpoch::MAX,
627 ))
628 } else {
629 Unbounded
630 }
631 }
632 };
633 (start, end)
634}
635
636impl MemoryStateStore {
637 pub fn new() -> Self {
638 Self::default()
639 }
640
641 pub fn shared() -> Self {
642 static STORE: LazyLock<MemoryStateStore> = LazyLock::new(MemoryStateStore::new);
643 STORE.clone()
644 }
645}
646
647impl<R: RangeKv> RangeKvStateStore<R> {
648 fn scan(
649 &self,
650 key_range: TableKeyRange,
651 epoch: u64,
652 table_id: TableId,
653 limit: Option<usize>,
654 ) -> StorageResult<Vec<(Bytes, Bytes)>> {
655 let mut data = vec![];
656 if limit == Some(0) {
657 return Ok(vec![]);
658 }
659 let mut last_user_key = None;
660 for (key, value) in self
661 .inner
662 .range(to_full_key_range(table_id, key_range), None)?
663 {
664 if key.epoch_with_gap.pure_epoch() > epoch {
665 continue;
666 }
667 if Some(&key.user_key) != last_user_key.as_ref() {
668 if let Some(value) = value {
669 data.push((Bytes::from(key.encode()), value.clone()));
670 }
671 last_user_key = Some(key.user_key.clone());
672 }
673 if let Some(limit) = limit
674 && data.len() >= limit
675 {
676 break;
677 }
678 }
679 Ok(data)
680 }
681}
682
683#[derive(Clone)]
684pub struct RangeKvStateStoreReadSnapshot<R: RangeKv> {
685 inner: RangeKvStateStore<R>,
686 epoch: u64,
687 table_id: TableId,
688}
689
690impl<R: RangeKv> StateStoreGet for RangeKvStateStoreReadSnapshot<R> {
691 async fn on_key_value<'a, O: Send + 'a>(
692 &'a self,
693 key: TableKey<Bytes>,
694 _read_options: ReadOptions,
695 on_key_value_fn: impl KeyValueFn<'a, O>,
696 ) -> StorageResult<Option<O>> {
697 self.inner
698 .get_keyed_row_impl(key, self.epoch, self.table_id)
699 .and_then(|option| {
700 if let Some((key, value)) = option {
701 on_key_value_fn(key.to_ref(), value.as_ref()).map(Some)
702 } else {
703 Ok(None)
704 }
705 })
706 }
707}
708
709impl<R: RangeKv> StateStoreRead for RangeKvStateStoreReadSnapshot<R> {
710 type Iter = RangeKvStateStoreIter<R>;
711 type RevIter = RangeKvStateStoreRevIter<R>;
712
713 async fn iter(
714 &self,
715 key_range: TableKeyRange,
716 _read_options: ReadOptions,
717 ) -> StorageResult<Self::Iter> {
718 self.inner.iter_impl(key_range, self.epoch, self.table_id)
719 }
720
721 async fn rev_iter(
722 &self,
723 key_range: TableKeyRange,
724 _read_options: ReadOptions,
725 ) -> StorageResult<Self::RevIter> {
726 self.inner
727 .rev_iter_impl(key_range, self.epoch, self.table_id)
728 }
729}
730
731impl<R: RangeKv> StateStoreReadVector for RangeKvStateStoreReadSnapshot<R> {
732 async fn nearest<'a, O: Send + 'a>(
733 &'a self,
734 vec: VectorRef<'a>,
735 options: VectorNearestOptions,
736 on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
737 ) -> StorageResult<Vec<O>> {
738 fn nearest_impl<'a, M: MeasureDistanceBuilder, O>(
739 store: &'a InMemVectorStore,
740 epoch: u64,
741 table_id: TableId,
742 vec: VectorRef<'a>,
743 options: VectorNearestOptions,
744 on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
745 ) -> Vec<O> {
746 let mut builder = NearestBuilder::<'_, O, M>::new(vec, options.top_n);
747 builder.add(
748 store
749 .read()
750 .get(&table_id)
751 .map(|vec| vec.iter())
752 .into_iter()
753 .flatten()
754 .filter(|(_, _, vector_epoch)| epoch >= *vector_epoch)
755 .map(|(vec, info, _)| (vec.to_ref(), info.as_ref())),
756 on_nearest_item_fn,
757 );
758 builder.finish()
759 }
760 dispatch_distance_measurement!(options.measure, MeasurementType, {
761 Ok(nearest_impl::<MeasurementType, O>(
762 &self.inner.vectors,
763 self.epoch,
764 self.table_id,
765 vec,
766 options,
767 on_nearest_item_fn,
768 ))
769 })
770 }
771}
772
773impl<R: RangeKv> RangeKvStateStore<R> {
774 fn get_keyed_row_impl(
775 &self,
776 key: TableKey<Bytes>,
777 epoch: u64,
778 table_id: TableId,
779 ) -> StorageResult<Option<StateStoreKeyedRow>> {
780 let range_bounds = (Bound::Included(key.clone()), Bound::Included(key));
781 let res = self.scan(range_bounds, epoch, table_id, Some(1))?;
783
784 Ok(match res.as_slice() {
785 [] => None,
786 [(key, value)] => Some((
787 FullKey::decode(key.as_ref()).to_vec().into_bytes(),
788 value.clone(),
789 )),
790 _ => unreachable!(),
791 })
792 }
793
794 fn iter_impl(
795 &self,
796 key_range: TableKeyRange,
797 epoch: u64,
798 table_id: TableId,
799 ) -> StorageResult<RangeKvStateStoreIter<R>> {
800 Ok(RangeKvStateStoreIter::new(
801 batched_iter::Iter::new(
802 self.inner.clone(),
803 to_full_key_range(table_id, key_range),
804 false,
805 ),
806 epoch,
807 true,
808 ))
809 }
810
811 fn rev_iter_impl(
812 &self,
813 key_range: TableKeyRange,
814 epoch: u64,
815 table_id: TableId,
816 ) -> StorageResult<RangeKvStateStoreRevIter<R>> {
817 Ok(RangeKvStateStoreRevIter::new(
818 batched_iter::Iter::new(
819 self.inner.clone(),
820 to_full_key_range(table_id, key_range),
821 true,
822 ),
823 epoch,
824 true,
825 ))
826 }
827}
828
829impl<R: RangeKv> StateStoreReadLog for RangeKvStateStore<R> {
830 type ChangeLogIter = RangeKvStateStoreChangeLogIter<R>;
831
832 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
833 loop {
834 {
835 let tables = self.tables.lock();
836 let Some(tables) = tables.get(&options.table_id) else {
837 return Err(HummockError::next_epoch(format!(
838 "table {} not exist",
839 options.table_id
840 ))
841 .into());
842 };
843 if let Some(next_epoch) = tables.next_epochs.get(&epoch) {
844 break Ok(*next_epoch);
845 }
846 }
847 yield_now().await;
848 }
849 }
850
851 async fn iter_log(
852 &self,
853 (min_epoch, max_epoch): (u64, u64),
854 key_range: TableKeyRange,
855 options: ReadLogOptions,
856 ) -> StorageResult<Self::ChangeLogIter> {
857 let new_value_iter = RangeKvStateStoreIter::new(
858 batched_iter::Iter::new(
859 self.inner.clone(),
860 to_full_key_range(options.table_id, key_range.clone()),
861 false,
862 ),
863 max_epoch,
864 true,
865 );
866 let old_value_iter = RangeKvStateStoreIter::new(
867 batched_iter::Iter::new(
868 self.inner.clone(),
869 to_full_key_range(options.table_id, key_range),
870 false,
871 ),
872 min_epoch,
873 false,
874 );
875 RangeKvStateStoreChangeLogIter::new(new_value_iter, old_value_iter)
876 }
877}
878
879impl<R: RangeKv> RangeKvStateStore<R> {
880 fn new_read_snapshot_impl(
881 &self,
882 epoch: u64,
883 table_id: TableId,
884 ) -> RangeKvStateStoreReadSnapshot<R> {
885 RangeKvStateStoreReadSnapshot {
886 inner: self.clone(),
887 epoch,
888 table_id,
889 }
890 }
891
892 pub(crate) fn ingest_batch(
893 &self,
894 mut kv_pairs: Vec<(TableKey<Bytes>, StorageValue)>,
895 delete_ranges: Vec<(Bound<Bytes>, Bound<Bytes>)>,
896 epoch: u64,
897 table_id: TableId,
898 ) -> StorageResult<usize> {
899 let mut delete_keys = BTreeSet::new();
900 for del_range in delete_ranges {
901 for (key, _) in self.inner.range(
902 (
903 del_range
904 .0
905 .map(|table_key| FullKey::new(table_id, TableKey(table_key), epoch)),
906 del_range
907 .1
908 .map(|table_key| FullKey::new(table_id, TableKey(table_key), epoch)),
909 ),
910 None,
911 )? {
912 delete_keys.insert(key.user_key.table_key);
913 }
914 }
915 for key in delete_keys {
916 kv_pairs.push((key, StorageValue::new_delete()));
917 }
918
919 let mut size = 0;
920 self.inner
921 .ingest_batch(kv_pairs.into_iter().map(|(key, value)| {
922 size += key.len() + value.size();
923 (FullKey::new(table_id, key, epoch), value.user_value)
924 }))?;
925 Ok(size)
926 }
927
928 fn ingest_vectors(&self, table_id: TableId, epoch: u64, vecs: Vec<(Vector, Bytes)>) {
929 self.vectors
930 .write()
931 .entry(table_id)
932 .or_default()
933 .extend(vecs.into_iter().map(|(vec, info)| (vec, info, epoch)));
934 }
935}
936
937impl<R: RangeKv> StateStore for RangeKvStateStore<R> {
938 type Local = RangeKvLocalStateStore<R>;
939 type ReadSnapshot = RangeKvStateStoreReadSnapshot<R>;
940 type VectorWriter = RangeKvLocalStateStore<R>;
941
942 async fn try_wait_epoch(
943 &self,
944 _epoch: HummockReadEpoch,
945 _options: TryWaitEpochOptions,
946 ) -> StorageResult<()> {
947 Ok(())
949 }
950
951 async fn new_local(&self, option: NewLocalOptions) -> Self::Local {
952 RangeKvLocalStateStore::new(self.clone(), option)
953 }
954
955 async fn new_read_snapshot(
956 &self,
957 epoch: HummockReadEpoch,
958 options: NewReadSnapshotOptions,
959 ) -> StorageResult<Self::ReadSnapshot> {
960 Ok(self.new_read_snapshot_impl(epoch.get_epoch(), options.table_id))
961 }
962
963 async fn new_vector_writer(&self, options: NewVectorWriterOptions) -> Self::VectorWriter {
964 RangeKvLocalStateStore::new(
965 self.clone(),
966 NewLocalOptions {
967 table_id: options.table_id,
968 fragment_id: FragmentId::default(),
969 op_consistency_level: Default::default(),
970 table_option: Default::default(),
971 is_replicated: false,
972 vnodes: Arc::new(Bitmap::from_bool_slice(&[true])),
973 upload_on_flush: true,
974 },
975 )
976 }
977}
978
979pub struct RangeKvLocalStateStore<R: RangeKv> {
980 mem_table: MemTable,
981 vectors: Vec<(Vector, Bytes)>,
982 inner: RangeKvStateStore<R>,
983
984 epoch: Option<EpochPair>,
985
986 table_id: TableId,
987 op_consistency_level: OpConsistencyLevel,
988 vnodes: Arc<Bitmap>,
989}
990
991impl<R: RangeKv> RangeKvLocalStateStore<R> {
992 pub fn new(inner: RangeKvStateStore<R>, option: NewLocalOptions) -> Self {
993 Self {
994 inner,
995 mem_table: MemTable::new(option.table_id, option.op_consistency_level.clone()),
996 epoch: None,
997 table_id: option.table_id,
998 op_consistency_level: option.op_consistency_level,
999 vnodes: option.vnodes,
1000 vectors: vec![],
1001 }
1002 }
1003
1004 fn epoch(&self) -> u64 {
1005 self.epoch.expect("should have set the epoch").curr
1006 }
1007}
1008
1009impl<R: RangeKv> StateStoreGet for RangeKvLocalStateStore<R> {
1010 async fn on_key_value<'a, O: Send + 'a>(
1011 &'a self,
1012 key: TableKey<Bytes>,
1013 _read_options: ReadOptions,
1014 on_key_value_fn: impl KeyValueFn<'a, O>,
1015 ) -> StorageResult<Option<O>> {
1016 if let Some((key, value)) = match self.mem_table.buffer.get(&key) {
1017 None => self
1018 .inner
1019 .get_keyed_row_impl(key, self.epoch(), self.table_id)?,
1020 Some(op) => match op {
1021 KeyOp::Insert(value) | KeyOp::Update((_, value)) => Some((
1022 FullKey::new(self.table_id, key, self.epoch()),
1023 value.clone(),
1024 )),
1025 KeyOp::Delete(_) => None,
1026 },
1027 } {
1028 Ok(Some(on_key_value_fn(key.to_ref(), value.as_ref())?))
1029 } else {
1030 Ok(None)
1031 }
1032 }
1033}
1034
1035impl<R: RangeKv> LocalStateStore for RangeKvLocalStateStore<R> {
1036 type FlushedSnapshotReader = RangeKvStateStoreReadSnapshot<R>;
1037
1038 type Iter<'a> = impl StateStoreIter + 'a;
1039 type RevIter<'a> = impl StateStoreIter + 'a;
1040
1041 async fn iter(
1042 &self,
1043 key_range: TableKeyRange,
1044 _read_options: ReadOptions,
1045 ) -> StorageResult<Self::Iter<'_>> {
1046 let iter = self
1047 .inner
1048 .iter_impl(key_range.clone(), self.epoch(), self.table_id)?;
1049 Ok(FromStreamStateStoreIter::new(Box::pin(merge_stream(
1050 self.mem_table.iter(key_range),
1051 iter.into_stream(to_owned_item),
1052 self.table_id,
1053 self.epoch(),
1054 false,
1055 ))))
1056 }
1057
1058 async fn rev_iter(
1059 &self,
1060 key_range: TableKeyRange,
1061 _read_options: ReadOptions,
1062 ) -> StorageResult<Self::RevIter<'_>> {
1063 let iter = self
1064 .inner
1065 .rev_iter_impl(key_range.clone(), self.epoch(), self.table_id)?;
1066 Ok(FromStreamStateStoreIter::new(Box::pin(merge_stream(
1067 self.mem_table.rev_iter(key_range),
1068 iter.into_stream(to_owned_item),
1069 self.table_id,
1070 self.epoch(),
1071 true,
1072 ))))
1073 }
1074
1075 fn insert(
1076 &mut self,
1077 key: TableKey<Bytes>,
1078 new_val: Bytes,
1079 old_val: Option<Bytes>,
1080 ) -> StorageResult<()> {
1081 match old_val {
1082 None => self.mem_table.insert(key, new_val)?,
1083 Some(old_val) => self.mem_table.update(key, old_val, new_val)?,
1084 };
1085 Ok(())
1086 }
1087
1088 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
1089 Ok(self.mem_table.delete(key, old_val)?)
1090 }
1091
1092 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
1093 if self.vnodes.len() > 1 {
1094 TableState::wait_epoch(
1095 &self.inner.tables,
1096 self.table_id,
1097 self.epoch.expect("should have init").prev,
1098 )
1099 .await;
1100 }
1101 Ok(std::mem::replace(&mut self.vnodes, vnodes))
1102 }
1103
1104 fn get_table_watermark(&self, _vnode: VirtualNode) -> Option<Bytes> {
1105 None
1107 }
1108
1109 fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
1110 self.inner.new_read_snapshot_impl(MAX_EPOCH, self.table_id)
1111 }
1112}
1113
1114impl<R: RangeKv> StateStoreWriteEpochControl for RangeKvLocalStateStore<R> {
1115 async fn flush(&mut self) -> StorageResult<usize> {
1116 let buffer = self.mem_table.drain().into_parts();
1117 let mut kv_pairs = Vec::with_capacity(buffer.len());
1118 let sanity_check_read_snapshot = if sanity_check_enabled() {
1119 Some(self.inner.new_read_snapshot_impl(MAX_EPOCH, self.table_id))
1120 } else {
1121 None
1122 };
1123 for (key, key_op) in buffer {
1124 match key_op {
1125 KeyOp::Insert(value) => {
1129 if let Some(sanity_check_read_snapshot) = &sanity_check_read_snapshot {
1130 do_insert_sanity_check(
1131 self.table_id,
1132 &key,
1133 &value,
1134 sanity_check_read_snapshot,
1135 &self.op_consistency_level,
1136 )
1137 .await?;
1138 }
1139 kv_pairs.push((key, StorageValue::new_put(value)));
1140 }
1141 KeyOp::Delete(old_value) => {
1142 if let Some(sanity_check_read_snapshot) = &sanity_check_read_snapshot {
1143 do_delete_sanity_check(
1144 self.table_id,
1145 &key,
1146 &old_value,
1147 sanity_check_read_snapshot,
1148 &self.op_consistency_level,
1149 )
1150 .await?;
1151 }
1152 kv_pairs.push((key, StorageValue::new_delete()));
1153 }
1154 KeyOp::Update((old_value, new_value)) => {
1155 if let Some(sanity_check_read_snapshot) = &sanity_check_read_snapshot {
1156 do_update_sanity_check(
1157 self.table_id,
1158 &key,
1159 &old_value,
1160 &new_value,
1161 sanity_check_read_snapshot,
1162 &self.op_consistency_level,
1163 )
1164 .await?;
1165 }
1166 kv_pairs.push((key, StorageValue::new_put(new_value)));
1167 }
1168 }
1169 }
1170 let epoch = self.epoch();
1171 self.inner
1172 .ingest_vectors(self.table_id, epoch, take(&mut self.vectors));
1173 self.inner
1174 .ingest_batch(kv_pairs, vec![], epoch, self.table_id)
1175 }
1176
1177 async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
1178 assert_eq!(
1179 self.epoch.replace(options.epoch),
1180 None,
1181 "epoch in local state store of table id {:?} is init for more than once",
1182 self.table_id
1183 );
1184 self.inner
1185 .tables
1186 .lock()
1187 .entry(self.table_id)
1188 .or_insert_with(|| TableState::new(options.epoch.prev))
1189 .next_epochs
1190 .insert(options.epoch.prev, options.epoch.curr);
1191 if self.vnodes.len() > 1 {
1192 TableState::wait_epoch(&self.inner.tables, self.table_id, options.epoch.prev).await;
1193 }
1194
1195 Ok(())
1196 }
1197
1198 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
1199 assert!(!self.mem_table.is_dirty());
1200 if let Some(value_checker) = opts.switch_op_consistency_level {
1201 self.mem_table.op_consistency_level.update(&value_checker);
1202 }
1203 let epoch = self
1204 .epoch
1205 .as_mut()
1206 .expect("should have init epoch before seal the first epoch");
1207 let prev_epoch = epoch.curr;
1208 epoch.prev = prev_epoch;
1209 epoch.curr = next_epoch;
1210 assert!(
1211 next_epoch > prev_epoch,
1212 "new epoch {} should be greater than current epoch: {}",
1213 next_epoch,
1214 prev_epoch
1215 );
1216
1217 let mut tables = self.inner.tables.lock();
1218 let table_state = tables
1219 .get_mut(&self.table_id)
1220 .expect("should be set when init");
1221
1222 table_state.next_epochs.insert(prev_epoch, next_epoch);
1223 if self.vnodes.len() > 1 {
1224 let sealing_epoch_vnodes = table_state
1225 .sealing_epochs
1226 .entry(prev_epoch)
1227 .or_insert_with(|| BitmapBuilder::zeroed(self.vnodes.len()));
1228 assert_eq!(self.vnodes.len(), sealing_epoch_vnodes.len());
1229 for vnode in self.vnodes.iter_ones() {
1230 assert!(!sealing_epoch_vnodes.is_set(vnode));
1231 sealing_epoch_vnodes.set(vnode, true);
1232 }
1233 if (0..self.vnodes.len()).all(|vnode| sealing_epoch_vnodes.is_set(vnode)) {
1234 let (all_sealed_epoch, _) =
1235 table_state.sealing_epochs.pop_first().expect("non-empty");
1236 assert_eq!(
1237 all_sealed_epoch, prev_epoch,
1238 "new all_sealed_epoch must be the current prev epoch"
1239 );
1240 if let Some(prev_latest_sealed_epoch) =
1241 table_state.latest_sealed_epoch.replace(prev_epoch)
1242 {
1243 assert!(prev_epoch > prev_latest_sealed_epoch);
1244 }
1245 }
1246 }
1247
1248 if let Some((direction, watermarks, _watermark_type)) = opts.table_watermarks {
1249 let delete_ranges = watermarks
1250 .iter()
1251 .flat_map(|vnode_watermark| {
1252 let inner_range = match direction {
1253 WatermarkDirection::Ascending => {
1254 (Unbounded, Excluded(vnode_watermark.watermark().clone()))
1255 }
1256 WatermarkDirection::Descending => {
1257 (Excluded(vnode_watermark.watermark().clone()), Unbounded)
1258 }
1259 };
1260 vnode_watermark
1261 .vnode_bitmap()
1262 .iter_vnodes()
1263 .map(move |vnode| {
1264 let (start, end) =
1265 prefixed_range_with_vnode(inner_range.clone(), vnode);
1266 (start.map(|key| key.0), end.map(|key| key.0))
1267 })
1268 })
1269 .collect_vec();
1270 if let Err(e) =
1271 self.inner
1272 .ingest_batch(Vec::new(), delete_ranges, self.epoch(), self.table_id)
1273 {
1274 error!(error = %e.as_report(), "failed to write delete ranges of table watermark");
1275 }
1276 }
1277 }
1278
1279 async fn try_flush(&mut self) -> StorageResult<()> {
1280 Ok(())
1281 }
1282}
1283
1284impl<R: RangeKv> StateStoreWriteVector for RangeKvLocalStateStore<R> {
1285 fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> StorageResult<()> {
1286 self.vectors.push((vec.to_owned_scalar(), info));
1287 Ok(())
1288 }
1289}
1290
1291pub struct RangeKvStateStoreIter<R: RangeKv> {
1292 inner: batched_iter::Iter<R>,
1293
1294 epoch: HummockEpoch,
1295 is_inclusive_epoch: bool,
1296
1297 last_key: Option<UserKey<Bytes>>,
1298
1299 item_buffer: Option<StateStoreKeyedRow>,
1300}
1301
1302impl<R: RangeKv> RangeKvStateStoreIter<R> {
1303 pub fn new(
1304 inner: batched_iter::Iter<R>,
1305 epoch: HummockEpoch,
1306 is_inclusive_epoch: bool,
1307 ) -> Self {
1308 Self {
1309 inner,
1310 epoch,
1311 is_inclusive_epoch,
1312 last_key: None,
1313 item_buffer: None,
1314 }
1315 }
1316}
1317
1318impl<R: RangeKv> StateStoreIter for RangeKvStateStoreIter<R> {
1319 async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
1320 self.next_inner()?;
1321 Ok(self
1322 .item_buffer
1323 .as_ref()
1324 .map(|(key, value)| (key.to_ref(), value.as_ref())))
1325 }
1326}
1327
1328impl<R: RangeKv> RangeKvStateStoreIter<R> {
1329 fn next_inner(&mut self) -> StorageResult<()> {
1330 self.item_buffer = None;
1331 while let Some((key, value)) = self.inner.next()? {
1332 let epoch = key.epoch_with_gap.pure_epoch();
1333 if epoch > self.epoch {
1334 continue;
1335 }
1336 if epoch == self.epoch && !self.is_inclusive_epoch {
1337 continue;
1338 }
1339 if Some(key.user_key.as_ref()) != self.last_key.as_ref().map(|key| key.as_ref()) {
1340 self.last_key = Some(key.user_key.clone());
1341 if let Some(value) = value {
1342 self.item_buffer = Some((key, value));
1343 break;
1344 }
1345 }
1346 }
1347 Ok(())
1348 }
1349}
1350
1351pub struct RangeKvStateStoreRevIter<R: RangeKv> {
1352 inner: batched_iter::Iter<R>,
1353
1354 epoch: HummockEpoch,
1355 is_inclusive_epoch: bool,
1356
1357 item_buffer: VecDeque<StateStoreKeyedRow>,
1358}
1359
1360impl<R: RangeKv> RangeKvStateStoreRevIter<R> {
1361 pub fn new(
1362 inner: batched_iter::Iter<R>,
1363 epoch: HummockEpoch,
1364 is_inclusive_epoch: bool,
1365 ) -> Self {
1366 Self {
1367 inner,
1368 epoch,
1369 is_inclusive_epoch,
1370 item_buffer: VecDeque::default(),
1371 }
1372 }
1373}
1374
1375impl<R: RangeKv> StateStoreIter for RangeKvStateStoreRevIter<R> {
1376 async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
1377 self.next_inner()?;
1378 Ok(self
1379 .item_buffer
1380 .back()
1381 .map(|(key, value)| (key.to_ref(), value.as_ref())))
1382 }
1383}
1384
1385impl<R: RangeKv> RangeKvStateStoreRevIter<R> {
1386 fn next_inner(&mut self) -> StorageResult<()> {
1387 self.item_buffer.pop_back();
1388 while let Some((key, value)) = self.inner.next()? {
1389 let epoch = key.epoch_with_gap.pure_epoch();
1390 if epoch > self.epoch {
1391 continue;
1392 }
1393 if epoch == self.epoch && !self.is_inclusive_epoch {
1394 continue;
1395 }
1396
1397 let v = match value {
1398 Some(v) => v,
1399 None => {
1400 if let Some(last_key) = self.item_buffer.front()
1401 && key.user_key.as_ref() == last_key.0.user_key.as_ref()
1402 {
1403 self.item_buffer.clear();
1404 }
1405 continue;
1406 }
1407 };
1408
1409 if let Some(last_key) = self.item_buffer.front() {
1410 if key.user_key.as_ref() != last_key.0.user_key.as_ref() {
1411 self.item_buffer.push_front((key, v));
1412 break;
1413 } else {
1414 self.item_buffer.pop_front();
1415 self.item_buffer.push_front((key, v));
1416 }
1417 } else {
1418 self.item_buffer.push_front((key, v));
1419 }
1420 }
1421 Ok(())
1422 }
1423}
1424
1425pub struct RangeKvStateStoreChangeLogIter<R: RangeKv> {
1426 new_value_iter: RangeKvStateStoreIter<R>,
1427 old_value_iter: RangeKvStateStoreIter<R>,
1428 item_buffer: Option<(TableKey<Bytes>, ChangeLogValue<Bytes>)>,
1429}
1430
1431impl<R: RangeKv> RangeKvStateStoreChangeLogIter<R> {
1432 fn new(
1433 mut new_value_iter: RangeKvStateStoreIter<R>,
1434 mut old_value_iter: RangeKvStateStoreIter<R>,
1435 ) -> StorageResult<Self> {
1436 new_value_iter.next_inner()?;
1437 old_value_iter.next_inner()?;
1438 Ok(Self {
1439 new_value_iter,
1440 old_value_iter,
1441 item_buffer: None,
1442 })
1443 }
1444}
1445
1446impl<R: RangeKv> StateStoreIter<StateStoreReadLogItem> for RangeKvStateStoreChangeLogIter<R> {
1447 async fn try_next(&mut self) -> StorageResult<Option<StateStoreReadLogItemRef<'_>>> {
1448 loop {
1449 match (
1450 &self.new_value_iter.item_buffer,
1451 &self.old_value_iter.item_buffer,
1452 ) {
1453 (None, None) => {
1454 self.item_buffer = None;
1455 break;
1456 }
1457 (Some((key, new_value)), None) => {
1458 self.item_buffer = Some((
1459 key.user_key.table_key.clone(),
1460 ChangeLogValue::Insert(new_value.clone()),
1461 ));
1462 self.new_value_iter.next_inner()?;
1463 }
1464 (None, Some((key, old_value))) => {
1465 self.item_buffer = Some((
1466 key.user_key.table_key.clone(),
1467 ChangeLogValue::Delete(old_value.clone()),
1468 ));
1469 self.old_value_iter.next_inner()?;
1470 }
1471 (Some((new_value_key, new_value)), Some((old_value_key, old_value))) => {
1472 match new_value_key.user_key.cmp(&old_value_key.user_key) {
1473 Ordering::Less => {
1474 self.item_buffer = Some((
1475 new_value_key.user_key.table_key.clone(),
1476 ChangeLogValue::Insert(new_value.clone()),
1477 ));
1478 self.new_value_iter.next_inner()?;
1479 }
1480 Ordering::Greater => {
1481 self.item_buffer = Some((
1482 old_value_key.user_key.table_key.clone(),
1483 ChangeLogValue::Delete(old_value.clone()),
1484 ));
1485 self.old_value_iter.next_inner()?;
1486 }
1487 Ordering::Equal => {
1488 if new_value == old_value {
1489 self.new_value_iter.next_inner()?;
1490 self.old_value_iter.next_inner()?;
1491 continue;
1492 }
1493 self.item_buffer = Some((
1494 new_value_key.user_key.table_key.clone(),
1495 ChangeLogValue::Update {
1496 new_value: new_value.clone(),
1497 old_value: old_value.clone(),
1498 },
1499 ));
1500 self.new_value_iter.next_inner()?;
1501 self.old_value_iter.next_inner()?;
1502 }
1503 }
1504 }
1505 };
1506 break;
1507 }
1508 Ok(self
1509 .item_buffer
1510 .as_ref()
1511 .map(|(key, value)| (key.to_ref(), value.to_ref())))
1512 }
1513}
1514
1515#[cfg(test)]
1516mod tests {
1517 use risingwave_common::util::epoch::test_epoch;
1518
1519 use super::*;
1520 use crate::hummock::iterator::test_utils::{
1521 iterator_test_table_key_of, iterator_test_value_of,
1522 };
1523 use crate::hummock::test_utils::{ReadOptions, *};
1524 use crate::memory::sled::SledStateStore;
1525
1526 #[tokio::test]
1527 async fn test_snapshot_isolation_memory() {
1528 let state_store = MemoryStateStore::new();
1529 test_snapshot_isolation_inner(state_store).await;
1530 }
1531
1532 #[cfg(not(madsim))]
1533 #[tokio::test]
1534 async fn test_snapshot_isolation_sled() {
1535 let state_store = SledStateStore::new_temp();
1536 test_snapshot_isolation_inner(state_store).await;
1537 }
1538
1539 async fn test_snapshot_isolation_inner(state_store: RangeKvStateStore<impl RangeKv>) {
1540 state_store
1541 .ingest_batch(
1542 vec![
1543 (
1544 TableKey(Bytes::from(b"a".to_vec())),
1545 StorageValue::new_put(b"v1".to_vec()),
1546 ),
1547 (
1548 TableKey(Bytes::from(b"b".to_vec())),
1549 StorageValue::new_put(b"v1".to_vec()),
1550 ),
1551 ],
1552 vec![],
1553 0,
1554 Default::default(),
1555 )
1556 .unwrap();
1557 state_store
1558 .ingest_batch(
1559 vec![
1560 (
1561 TableKey(Bytes::from(b"a".to_vec())),
1562 StorageValue::new_put(b"v2".to_vec()),
1563 ),
1564 (
1565 TableKey(Bytes::from(b"b".to_vec())),
1566 StorageValue::new_delete(),
1567 ),
1568 ],
1569 vec![],
1570 test_epoch(1),
1571 Default::default(),
1572 )
1573 .unwrap();
1574 assert_eq!(
1575 state_store
1576 .scan(
1577 (
1578 Bound::Included(TableKey(Bytes::from("a"))),
1579 Bound::Included(TableKey(Bytes::from("b"))),
1580 ),
1581 0,
1582 TableId::default(),
1583 None,
1584 )
1585 .unwrap(),
1586 vec![
1587 (
1588 FullKey::for_test(Default::default(), Bytes::from("a"), 0)
1589 .encode()
1590 .into(),
1591 b"v1".to_vec().into()
1592 ),
1593 (
1594 FullKey::for_test(Default::default(), Bytes::from("b"), 0)
1595 .encode()
1596 .into(),
1597 b"v1".to_vec().into()
1598 )
1599 ]
1600 );
1601 assert_eq!(
1602 state_store
1603 .scan(
1604 (
1605 Bound::Included(TableKey(Bytes::from("a"))),
1606 Bound::Included(TableKey(Bytes::from("b"))),
1607 ),
1608 0,
1609 TableId::default(),
1610 Some(1),
1611 )
1612 .unwrap(),
1613 vec![(
1614 FullKey::for_test(Default::default(), b"a".to_vec(), 0)
1615 .encode()
1616 .into(),
1617 b"v1".to_vec().into()
1618 )]
1619 );
1620 assert_eq!(
1621 state_store
1622 .scan(
1623 (
1624 Bound::Included(TableKey(Bytes::from("a"))),
1625 Bound::Included(TableKey(Bytes::from("b"))),
1626 ),
1627 test_epoch(1),
1628 TableId::default(),
1629 None,
1630 )
1631 .unwrap(),
1632 vec![(
1633 FullKey::for_test(Default::default(), b"a".to_vec(), test_epoch(1))
1634 .encode()
1635 .into(),
1636 b"v2".to_vec().into()
1637 )]
1638 );
1639 assert_eq!(
1640 state_store
1641 .get(TableKey(Bytes::from("a")), 0, ReadOptions::default())
1642 .await
1643 .unwrap(),
1644 Some(Bytes::from("v1"))
1645 );
1646 assert_eq!(
1647 state_store
1648 .get(
1649 TableKey(Bytes::copy_from_slice(b"b")),
1650 0,
1651 ReadOptions::default(),
1652 )
1653 .await
1654 .unwrap(),
1655 Some(b"v1".to_vec().into())
1656 );
1657 assert_eq!(
1658 state_store
1659 .get(
1660 TableKey(Bytes::copy_from_slice(b"c")),
1661 0,
1662 ReadOptions::default(),
1663 )
1664 .await
1665 .unwrap(),
1666 None
1667 );
1668 assert_eq!(
1669 state_store
1670 .get(
1671 TableKey(Bytes::copy_from_slice(b"a")),
1672 test_epoch(1),
1673 ReadOptions::default(),
1674 )
1675 .await
1676 .unwrap(),
1677 Some(b"v2".to_vec().into())
1678 );
1679 assert_eq!(
1680 state_store
1681 .get(
1682 TableKey(Bytes::from("b")),
1683 test_epoch(1),
1684 ReadOptions::default(),
1685 )
1686 .await
1687 .unwrap(),
1688 None
1689 );
1690 assert_eq!(
1691 state_store
1692 .get(
1693 TableKey(Bytes::from("c")),
1694 test_epoch(1),
1695 ReadOptions::default()
1696 )
1697 .await
1698 .unwrap(),
1699 None
1700 );
1701 }
1702
1703 #[tokio::test]
1704 async fn test_iter_log_memory() {
1705 let state_store = MemoryStateStore::new();
1706 test_iter_log_inner(state_store).await;
1707 }
1708
1709 #[cfg(not(madsim))]
1710 #[tokio::test]
1711 async fn test_iter_log_sled() {
1712 let state_store = SledStateStore::new_temp();
1713 test_iter_log_inner(state_store).await;
1714 }
1715
1716 async fn test_iter_log_inner(state_store: RangeKvStateStore<impl RangeKv>) {
1717 let table_id = TableId::new(233);
1718 let epoch1 = test_epoch(1);
1719 let key_idx = [1, 2, 4];
1720 let make_key = |i| TableKey(Bytes::from(iterator_test_table_key_of(i)));
1721 let make_value = |i| Bytes::from(iterator_test_value_of(i));
1722 state_store
1723 .ingest_batch(
1724 key_idx
1725 .iter()
1726 .map(|i| (make_key(*i), StorageValue::new_put(make_value(*i))))
1727 .collect(),
1728 vec![],
1729 epoch1,
1730 table_id,
1731 )
1732 .unwrap();
1733 {
1734 let mut iter = state_store
1735 .iter_log(
1736 (epoch1, epoch1),
1737 (Unbounded, Unbounded),
1738 ReadLogOptions { table_id },
1739 )
1740 .await
1741 .unwrap();
1742 for i in key_idx {
1743 let (iter_key, change_value) = iter.try_next().await.unwrap().unwrap();
1744 assert_eq!(make_key(i).to_ref(), iter_key);
1745 assert_eq!(change_value, ChangeLogValue::Insert(make_value(i).as_ref()));
1746 }
1747 assert!(iter.try_next().await.unwrap().is_none());
1748 }
1749
1750 let epoch2 = test_epoch(2);
1751 state_store
1752 .ingest_batch(
1753 vec![
1754 (make_key(1), StorageValue::new_put(make_value(12))), (make_key(2), StorageValue::new_delete()), (make_key(3), StorageValue::new_put(make_value(3))),
1757 ],
1758 vec![],
1759 epoch2,
1760 table_id,
1761 )
1762 .unwrap();
1763
1764 {
1766 let expected = vec![
1767 (
1768 make_key(1),
1769 ChangeLogValue::Update {
1770 new_value: make_value(12),
1771 old_value: make_value(1),
1772 },
1773 ),
1774 (make_key(2), ChangeLogValue::Delete(make_value(2))),
1775 (make_key(3), ChangeLogValue::Insert(make_value(3))),
1776 ];
1777 let mut iter = state_store
1778 .iter_log(
1779 (epoch2, epoch2),
1780 (Unbounded, Unbounded),
1781 ReadLogOptions { table_id },
1782 )
1783 .await
1784 .unwrap();
1785 for (key, change_log_value) in expected {
1786 let (iter_key, iter_value) = iter.try_next().await.unwrap().unwrap();
1787 assert_eq!(
1788 key.to_ref(),
1789 iter_key,
1790 "{:?} {:?}",
1791 change_log_value.to_ref(),
1792 iter_value
1793 );
1794 assert_eq!(change_log_value.to_ref(), iter_value);
1795 }
1796 assert!(iter.try_next().await.unwrap().is_none());
1797 }
1798 {
1800 let mut iter = state_store
1801 .iter_log(
1802 (epoch1, epoch1),
1803 (Unbounded, Unbounded),
1804 ReadLogOptions { table_id },
1805 )
1806 .await
1807 .unwrap();
1808 for i in key_idx {
1809 let (iter_key, change_value) = iter.try_next().await.unwrap().unwrap();
1810 assert_eq!(make_key(i).to_ref(), iter_key);
1811 assert_eq!(change_value, ChangeLogValue::Insert(make_value(i).as_ref()));
1812 }
1813 assert!(iter.try_next().await.unwrap().is_none());
1814 }
1815 {
1817 let mut iter = state_store
1818 .iter_log(
1819 (epoch1, epoch2),
1820 (Unbounded, Unbounded),
1821 ReadLogOptions { table_id },
1822 )
1823 .await
1824 .unwrap();
1825 let (iter_key, change_value) = iter.try_next().await.unwrap().unwrap();
1826 assert_eq!(make_key(1).to_ref(), iter_key);
1827 assert_eq!(
1828 change_value,
1829 ChangeLogValue::Insert(make_value(12).as_ref())
1830 );
1831 for i in [3, 4] {
1832 let (iter_key, change_value) = iter.try_next().await.unwrap().unwrap();
1833 assert_eq!(make_key(i).to_ref(), iter_key);
1834 assert_eq!(change_value, ChangeLogValue::Insert(make_value(i).as_ref()));
1835 }
1836 assert!(iter.try_next().await.unwrap().is_none());
1837 }
1838 }
1839}