1use std::cmp::Ordering;
16use std::ops::Bound::{Excluded, Included, Unbounded};
17
18use risingwave_common::catalog::TableId;
19use risingwave_common::must_match;
20use risingwave_common::util::epoch::MAX_SPILL_TIMES;
21use risingwave_hummock_sdk::EpochWithGap;
22use risingwave_hummock_sdk::key::{
23 FullKey, SetSlice, TableKeyRange, UserKey, UserKeyRange, bound_table_key_range,
24};
25
26use crate::StateStoreIter;
27use crate::error::StorageResult;
28use crate::hummock::iterator::{Forward, HummockIterator, MergeIterator};
29use crate::hummock::value::HummockValue;
30use crate::hummock::{HummockResult, SstableIterator};
31use crate::monitor::IterLocalMetricsGuard;
32use crate::store::{ChangeLogValue, StateStoreReadLogItem, StateStoreReadLogItemRef};
33
34struct ChangeLogIteratorInner<
35 NI: HummockIterator<Direction = Forward>,
36 OI: HummockIterator<Direction = Forward>,
37> {
38 new_value_iter: NI,
43 old_value_iter: OI,
49 max_epoch: u64,
51 min_epoch: u64,
53 key_range: UserKeyRange,
54
55 curr_key: FullKey<Vec<u8>>,
57 new_value: Vec<u8>,
59 is_new_value_delete: bool,
61
62 is_old_value_set: bool,
64
65 is_current_pos_valid: bool,
67}
68
69impl<NI: HummockIterator<Direction = Forward>, OI: HummockIterator<Direction = Forward>>
70 ChangeLogIteratorInner<NI, OI>
71{
72 fn new(
73 (min_epoch, max_epoch): (u64, u64),
74 key_range: UserKeyRange,
75 new_value_iter: NI,
76 old_value_iter: OI,
77 ) -> Self {
78 Self {
79 new_value_iter,
80 old_value_iter,
81 min_epoch,
82 max_epoch,
83 key_range,
84
85 curr_key: FullKey::default(),
86 new_value: vec![],
87 is_new_value_delete: false,
88 is_old_value_set: false,
89 is_current_pos_valid: false,
90 }
91 }
92
93 pub async fn rewind(&mut self) -> HummockResult<()> {
95 match &self.key_range.0 {
97 Included(begin_key) => {
98 let full_key = FullKey {
99 user_key: begin_key.as_ref(),
100 epoch_with_gap: EpochWithGap::new(self.max_epoch, MAX_SPILL_TIMES),
101 };
102 self.new_value_iter.seek(full_key).await?;
103 self.old_value_iter.seek(full_key).await?;
104 }
105 Excluded(_) => unimplemented!("excluded begin key is not supported"),
106 Unbounded => {
107 self.new_value_iter.rewind().await?;
108 self.old_value_iter.rewind().await?;
109 }
110 };
111
112 self.try_advance_to_next_change_log_value().await?;
113 Ok(())
114 }
115
116 pub async fn next(&mut self) -> HummockResult<()> {
117 self.try_advance_to_next_change_log_value().await
118 }
119
120 pub fn is_valid(&self) -> bool {
121 self.is_current_pos_valid
122 }
123
124 pub fn log_value(&self) -> ChangeLogValue<&[u8]> {
125 if self.is_new_value_delete {
126 ChangeLogValue::Delete(
127 self.old_value()
128 .expect("should have old value when new value is delete"),
129 )
130 } else {
131 match self.old_value() {
132 Some(old_value) => ChangeLogValue::Update {
133 new_value: self.new_value.as_slice(),
134 old_value,
135 },
136 None => ChangeLogValue::Insert(self.new_value.as_slice()),
137 }
138 }
139 }
140
141 pub fn key(&self) -> UserKey<&[u8]> {
142 self.curr_key.user_key.as_ref()
143 }
144}
145
146impl<NI: HummockIterator<Direction = Forward>, OI: HummockIterator<Direction = Forward>>
147 ChangeLogIteratorInner<NI, OI>
148{
149 async fn try_advance_to_next_change_log_value(&mut self) -> HummockResult<()> {
150 loop {
151 self.try_advance_to_next_valid().await?;
152 if !self.is_valid() {
153 break;
154 }
155 if self.has_log_value() {
156 break;
157 } else {
158 continue;
159 }
160 }
161 Ok(())
162 }
163
164 fn user_key_out_of_range(&self, user_key: UserKey<&[u8]>) -> bool {
165 match &self.key_range.1 {
167 Included(end_key) => user_key > end_key.as_ref(),
168 Excluded(end_key) => user_key >= end_key.as_ref(),
169 Unbounded => false,
170 }
171 }
172
173 async fn advance_to_valid_key(&mut self) -> HummockResult<()> {
175 self.is_current_pos_valid = false;
176 loop {
177 if !self.new_value_iter.is_valid() {
178 return Ok(());
179 }
180
181 let key = self.new_value_iter.key();
182
183 if !self.is_valid_epoch(key.epoch_with_gap) {
185 self.new_value_iter.next().await?;
186 continue;
187 }
188
189 if self.user_key_out_of_range(key.user_key) {
190 return Ok(());
191 }
192
193 break;
194 }
195
196 debug_assert!(self.new_value_iter.is_valid());
197 debug_assert!(self.is_valid_epoch(self.new_value_iter.key().epoch_with_gap));
198 debug_assert!(!self.user_key_out_of_range(self.new_value_iter.key().user_key));
199 self.is_current_pos_valid = true;
200 self.curr_key.set(self.new_value_iter.key());
203 match self.new_value_iter.value() {
204 HummockValue::Put(val) => {
205 self.new_value.set(val);
206 self.is_new_value_delete = false;
207 }
208 HummockValue::Delete => {
209 self.new_value.clear();
210 self.is_new_value_delete = true;
211 }
212 }
213
214 Ok(())
215 }
216
217 async fn advance_to_find_oldest_epoch(&mut self) -> HummockResult<EpochWithGap> {
219 let mut ret = self.curr_key.epoch_with_gap;
220 debug_assert!(self.is_valid_epoch(ret));
221 self.new_value_iter.next().await?;
222 loop {
223 if !self.new_value_iter.is_valid() {
224 break;
225 }
226 let key = self.new_value_iter.key();
227 match self.curr_key.user_key.as_ref().cmp(&key.user_key) {
228 Ordering::Less => {
229 break;
231 }
232 Ordering::Equal => {
233 assert!(ret > key.epoch_with_gap);
234 if !self.is_valid_epoch(key.epoch_with_gap) {
235 debug_assert!(self.min_epoch > key.epoch_with_gap.pure_epoch());
236 break;
237 }
238 ret = key.epoch_with_gap;
239 self.new_value_iter.next().await?;
240 continue;
241 }
242 Ordering::Greater => {
243 unreachable!(
244 "hummock iterator advance to a prev key: {:?} {:?}",
245 self.curr_key,
246 self.new_value_iter.key()
247 );
248 }
249 }
250 }
251 debug_assert!(self.is_valid_epoch(ret));
252
253 Ok(ret)
254 }
255
256 async fn try_advance_to_next_valid(&mut self) -> HummockResult<()> {
261 self.advance_to_valid_key().await?;
263
264 if !self.is_current_pos_valid {
265 return Ok(());
266 }
267
268 let oldest_epoch = self.advance_to_find_oldest_epoch().await?;
270
271 self.is_old_value_set = false;
273 loop {
274 if !self.old_value_iter.is_valid() {
275 break;
276 }
277
278 let old_value_iter_key = self.old_value_iter.key();
279 match self
280 .curr_key
281 .user_key
282 .as_ref()
283 .cmp(&old_value_iter_key.user_key.as_ref())
284 {
285 Ordering::Less => {
286 break;
288 }
289 Ordering::Equal => match old_value_iter_key.epoch_with_gap.cmp(&oldest_epoch) {
290 Ordering::Less => {
291 assert!(
295 old_value_iter_key.epoch_with_gap.pure_epoch() < self.min_epoch,
296 "there should not be old value between oldest new_value and min_epoch. \
297 new value key: {:?}, oldest epoch: {:?}, min epoch: {:?}, old value epoch: {:?}",
298 self.curr_key,
299 oldest_epoch,
300 self.min_epoch,
301 old_value_iter_key.epoch_with_gap
302 );
303 break;
304 }
305 Ordering::Equal => {
306 self.is_old_value_set = true;
307 break;
308 }
309 Ordering::Greater => {
310 self.old_value_iter.next().await?;
311 continue;
312 }
313 },
314 Ordering::Greater => {
315 self.old_value_iter.next().await?;
316 continue;
317 }
318 }
319 }
320
321 Ok(())
322 }
323
324 fn is_valid_epoch(&self, epoch: EpochWithGap) -> bool {
325 let epoch = epoch.pure_epoch();
326 self.min_epoch <= epoch && epoch <= self.max_epoch
327 }
328
329 fn old_value(&self) -> Option<&[u8]> {
330 if self.is_old_value_set {
331 debug_assert!(self.old_value_iter.is_valid());
332 debug_assert_eq!(
333 self.old_value_iter.key().user_key,
334 self.curr_key.user_key.as_ref()
335 );
336 Some(must_match!(self.old_value_iter.value(), HummockValue::Put(val) => val))
337 } else {
338 None
339 }
340 }
341
342 fn has_log_value(&self) -> bool {
343 debug_assert!(self.is_current_pos_valid);
344 !self.is_new_value_delete || self.is_old_value_set
345 }
346}
347
348impl Drop for ChangeLogIterator {
349 fn drop(&mut self) {
350 self.inner
351 .new_value_iter
352 .collect_local_statistic(&mut self.stats_guard.local_stats);
353 self.inner
354 .old_value_iter
355 .collect_local_statistic(&mut self.stats_guard.local_stats);
356 }
357}
358
359pub struct ChangeLogIterator {
360 inner: ChangeLogIteratorInner<MergeIterator<SstableIterator>, MergeIterator<SstableIterator>>,
361 initial_read: bool,
362 stats_guard: IterLocalMetricsGuard,
363}
364
365impl ChangeLogIterator {
366 pub async fn new(
367 epoch_range: (u64, u64),
368 table_key_range: TableKeyRange,
369 new_value_iter: MergeIterator<SstableIterator>,
370 old_value_iter: MergeIterator<SstableIterator>,
371 table_id: TableId,
372 stats_guard: IterLocalMetricsGuard,
373 ) -> HummockResult<Self> {
374 let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
375 let (start_bound, end_bound) = (
376 user_key_range_ref.0.map(|key| key.cloned()),
377 user_key_range_ref.1.map(|key| key.cloned()),
378 );
379 let mut inner = ChangeLogIteratorInner::new(
380 epoch_range,
381 (start_bound, end_bound),
382 new_value_iter,
383 old_value_iter,
384 );
385 inner.rewind().await?;
386 Ok(Self {
387 inner,
388 initial_read: false,
389 stats_guard,
390 })
391 }
392}
393
394impl StateStoreIter<StateStoreReadLogItem> for ChangeLogIterator {
395 async fn try_next(&mut self) -> StorageResult<Option<StateStoreReadLogItemRef<'_>>> {
396 if !self.initial_read {
397 self.initial_read = true;
398 } else {
399 self.inner.next().await?;
400 }
401 if self.inner.is_valid() {
402 Ok(Some((self.inner.key().table_key, self.inner.log_value())))
403 } else {
404 Ok(None)
405 }
406 }
407}
408
409#[cfg(any(test, feature = "test"))]
410pub mod test_utils {
411 use std::collections::HashMap;
412
413 use bytes::Bytes;
414 use rand::{Rng, RngCore, rng as thread_rng};
415 use risingwave_common::util::epoch::{EpochPair, MAX_EPOCH, test_epoch};
416 use risingwave_hummock_sdk::key::TableKey;
417
418 use crate::hummock::iterator::test_utils::iterator_test_table_key_of;
419 use crate::mem_table::KeyOp;
420 use crate::store::{InitOptions, LocalStateStore, SealCurrentEpochOptions};
421
422 pub type TestLogDataType = Vec<(u64, Vec<(TableKey<Bytes>, KeyOp)>)>;
423
424 pub fn gen_test_data(
425 epoch_count: usize,
426 key_count: usize,
427 skip_ratio: f64,
428 delete_ratio: f64,
429 ) -> TestLogDataType {
430 let mut store: HashMap<TableKey<Bytes>, Bytes> = HashMap::new();
431 let mut rng = thread_rng();
432 let mut logs = Vec::new();
433 for epoch_idx in 1..=(epoch_count - 1) {
434 let mut epoch_logs = Vec::new();
435 let epoch = test_epoch(epoch_idx as _);
436 for key_idx in 0..key_count {
437 if rng.random_bool(skip_ratio) {
438 continue;
439 }
440 let key = TableKey(Bytes::from(iterator_test_table_key_of(key_idx)));
441 if rng.random_bool(delete_ratio) {
442 if let Some(prev_value) = store.remove(&key) {
443 epoch_logs.push((key, KeyOp::Delete(prev_value)));
444 }
445 } else {
446 let value = Bytes::copy_from_slice(rng.next_u64().to_string().as_bytes());
447 let prev_value = store.get(&key);
448 if let Some(prev_value) = prev_value {
449 epoch_logs.push((
450 key.clone(),
451 KeyOp::Update((prev_value.clone(), value.clone())),
452 ));
453 } else {
454 epoch_logs.push((key.clone(), KeyOp::Insert(value.clone())));
455 }
456 store.insert(key, value);
457 }
458 }
459 logs.push((epoch, epoch_logs));
460 }
461 {
463 let mut epoch_logs = Vec::new();
464 let epoch = test_epoch(epoch_count as _);
465 for (key, value) in store {
466 epoch_logs.push((key, KeyOp::Delete(value)));
467 }
468 logs.push((epoch, epoch_logs));
469 }
470 logs
471 }
472
473 pub async fn apply_test_log_data(
474 log_data: TestLogDataType,
475 state_store: &mut impl LocalStateStore,
476 try_flush_ratio: f64,
477 ) {
478 let mut rng = thread_rng();
479 let first_epoch = log_data[0].0;
480 for (epoch, epoch_logs) in log_data {
481 if epoch == first_epoch {
482 state_store
483 .init(InitOptions {
484 epoch: EpochPair::new_test_epoch(epoch),
485 })
486 .await
487 .unwrap();
488 } else {
489 state_store.flush().await.unwrap();
490 state_store.seal_current_epoch(
491 epoch,
492 SealCurrentEpochOptions {
493 table_watermarks: None,
494 switch_op_consistency_level: None,
495 },
496 );
497 }
498 for (key, op) in epoch_logs {
499 match op {
500 KeyOp::Insert(value) => {
501 state_store.insert(key, value, None).unwrap();
502 }
503 KeyOp::Delete(old_value) => {
504 state_store.delete(key, old_value).unwrap();
505 }
506 KeyOp::Update((old_value, value)) => {
507 state_store.insert(key, value, Some(old_value)).unwrap();
508 }
509 }
510 if rng.random_bool(try_flush_ratio) {
511 state_store.try_flush().await.unwrap();
512 }
513 }
514 }
515 state_store.flush().await.unwrap();
516 state_store.seal_current_epoch(
517 MAX_EPOCH,
518 SealCurrentEpochOptions {
519 table_watermarks: None,
520 switch_op_consistency_level: None,
521 },
522 );
523 }
524}
525
526#[cfg(test)]
527mod tests {
528 use std::collections::BTreeMap;
529 use std::ops::Bound::Unbounded;
530
531 use bytes::Bytes;
532 use itertools::Itertools;
533 use risingwave_common::bitmap::Bitmap;
534 use risingwave_common::catalog::TableId;
535 use risingwave_common::hash::VirtualNode;
536 use risingwave_common::util::epoch::test_epoch;
537 use risingwave_hummock_sdk::EpochWithGap;
538 use risingwave_hummock_sdk::key::{TableKey, UserKey};
539
540 use crate::hummock::iterator::MergeIterator;
541 use crate::hummock::iterator::change_log::ChangeLogIteratorInner;
542 use crate::hummock::iterator::change_log::test_utils::{
543 TestLogDataType, apply_test_log_data, gen_test_data,
544 };
545 use crate::hummock::iterator::test_utils::{
546 iterator_test_table_key_of, iterator_test_value_of,
547 };
548 use crate::mem_table::{KeyOp, MemTable, MemTableHummockIterator, MemTableStore};
549 use crate::memory::MemoryStateStore;
550 use crate::store::{
551 CHECK_BYTES_EQUAL, ChangeLogValue, NewLocalOptions, OpConsistencyLevel, ReadLogOptions,
552 StateStoreReadLog,
553 };
554 use crate::{StateStore, StateStoreIter};
555
556 #[tokio::test]
557 async fn test_empty() {
558 let table_id = TableId::new(233);
559 let epoch = EpochWithGap::new_from_epoch(test_epoch(1));
560 let empty = BTreeMap::new();
561 let new_value_iter = MemTableHummockIterator::new(&empty, epoch, table_id);
562 let old_value_iter = MemTableHummockIterator::new(&empty, epoch, table_id);
563 let mut iter = ChangeLogIteratorInner::new(
564 (epoch.pure_epoch(), epoch.pure_epoch()),
565 (Unbounded, Unbounded),
566 new_value_iter,
567 old_value_iter,
568 );
569 iter.rewind().await.unwrap();
570 assert!(!iter.is_valid());
571 }
572
573 #[tokio::test]
574 async fn test_append_only() {
575 let table_id = TableId::new(233);
576
577 let count = 100;
578 let kvs = (0..count)
579 .map(|i| {
580 (
581 TableKey(Bytes::from(iterator_test_table_key_of(i))),
582 Bytes::from(iterator_test_value_of(i)),
583 )
584 })
585 .collect_vec();
586 let mem_tables = kvs
587 .iter()
588 .map(|(key, value)| {
589 let mut t = MemTable::new(OpConsistencyLevel::Inconsistent);
590 t.insert(key.clone(), value.clone()).unwrap();
591 t
592 })
593 .collect_vec();
594 let epoch = EpochWithGap::new_from_epoch(test_epoch(1));
595 let new_value_iter = MergeIterator::new(
596 mem_tables
597 .iter()
598 .map(|mem_table| MemTableHummockIterator::new(&mem_table.buffer, epoch, table_id)),
599 );
600 let empty = BTreeMap::new();
601 let old_value_iter = MemTableHummockIterator::new(&empty, epoch, table_id);
602 let mut iter = ChangeLogIteratorInner::new(
603 (epoch.pure_epoch(), epoch.pure_epoch()),
604 (Unbounded, Unbounded),
605 new_value_iter,
606 old_value_iter,
607 );
608 iter.rewind().await.unwrap();
609 for (key, value) in kvs {
610 assert!(iter.is_valid());
611 assert_eq!(
612 UserKey {
613 table_id,
614 table_key: key.to_ref(),
615 },
616 iter.key()
617 );
618 assert_eq!(ChangeLogValue::Insert(value.as_ref()), iter.log_value());
619 iter.next().await.unwrap();
620 }
621 assert!(!iter.is_valid());
622 }
623
624 #[tokio::test]
625 async fn test_delete_only() {
626 let table_id = TableId::new(233);
627
628 let count = 100;
629 let kvs = (0..count)
630 .map(|i| {
631 (
632 TableKey(Bytes::from(iterator_test_table_key_of(i))),
633 Bytes::from(iterator_test_value_of(i)),
634 )
635 })
636 .collect_vec();
637 let mut new_value_memtable = MemTable::new(OpConsistencyLevel::Inconsistent);
638 let mut old_value_memtable = MemTable::new(OpConsistencyLevel::Inconsistent);
639 for (key, value) in &kvs {
640 new_value_memtable
641 .delete(key.clone(), Bytes::new())
642 .unwrap();
643 old_value_memtable
644 .insert(key.clone(), value.clone())
645 .unwrap();
646 }
647 let epoch = EpochWithGap::new_from_epoch(test_epoch(1));
648 let new_value_iter =
649 MemTableHummockIterator::new(&new_value_memtable.buffer, epoch, table_id);
650 let old_value_iter =
651 MemTableHummockIterator::new(&old_value_memtable.buffer, epoch, table_id);
652 let mut iter = ChangeLogIteratorInner::new(
653 (epoch.pure_epoch(), epoch.pure_epoch()),
654 (Unbounded, Unbounded),
655 new_value_iter,
656 old_value_iter,
657 );
658 iter.rewind().await.unwrap();
659 for (key, value) in kvs {
660 assert!(iter.is_valid());
661 assert_eq!(
662 UserKey {
663 table_id,
664 table_key: key.to_ref(),
665 },
666 iter.key()
667 );
668 assert_eq!(ChangeLogValue::Delete(value.as_ref()), iter.log_value());
669 iter.next().await.unwrap();
670 }
671 assert!(!iter.is_valid());
672 }
673
674 fn gen_test_mem_table_store(
675 test_log_data: TestLogDataType,
676 ) -> Vec<(u64, MemTableStore, MemTableStore)> {
677 let mut logs = Vec::new();
678 for (epoch, epoch_logs) in test_log_data {
679 let mut new_values = MemTableStore::new();
680 let mut old_values = MemTableStore::new();
681 for (key, op) in epoch_logs {
682 new_values.insert(key.clone(), op.clone());
683 if let KeyOp::Delete(old_value) | KeyOp::Update((old_value, _)) = op {
684 old_values.insert(key, KeyOp::Insert(old_value));
685 }
686 }
687 logs.push((epoch, new_values, old_values));
688 }
689 logs
690 }
691
692 #[tokio::test]
693 async fn test_random_data() {
694 let table_id = TableId::new(233);
695 let epoch_count = 10;
696 let state_store = MemoryStateStore::new();
697 let mut local = state_store
698 .new_local(NewLocalOptions {
699 table_id,
700 op_consistency_level: OpConsistencyLevel::ConsistentOldValue {
701 check_old_value: CHECK_BYTES_EQUAL.clone(),
702 is_log_store: true,
703 },
704 table_option: Default::default(),
705 is_replicated: false,
706 vnodes: Bitmap::ones(VirtualNode::COUNT_FOR_TEST).into(),
707 })
708 .await;
709 let logs = gen_test_data(epoch_count, 10000, 0.05, 0.2);
710 assert_eq!(logs.len(), epoch_count);
711 apply_test_log_data(logs.clone(), &mut local, 0.0).await;
712 let mem_table_logs = gen_test_mem_table_store(logs.clone());
713 assert_eq!(mem_table_logs.len(), epoch_count);
714 for start_epoch_idx in 0..epoch_count {
715 for end_epoch_idx in start_epoch_idx..epoch_count {
716 let new_value_iter = MergeIterator::new(mem_table_logs.iter().map(
717 |(epoch, new_value_memtable, _)| {
718 MemTableHummockIterator::new(
719 new_value_memtable,
720 EpochWithGap::new_from_epoch(*epoch),
721 table_id,
722 )
723 },
724 ));
725 let old_value_iter = MergeIterator::new(mem_table_logs.iter().map(
726 |(epoch, _, old_value_memtable)| {
727 MemTableHummockIterator::new(
728 old_value_memtable,
729 EpochWithGap::new_from_epoch(*epoch),
730 table_id,
731 )
732 },
733 ));
734 let epoch_range = (logs[start_epoch_idx].0, logs[end_epoch_idx].0);
735 let mut change_log_iter = ChangeLogIteratorInner::new(
736 epoch_range,
737 (Unbounded, Unbounded),
738 new_value_iter,
739 old_value_iter,
740 );
741 change_log_iter.rewind().await.unwrap();
742 let mut expected_change_log_iter = state_store
743 .iter_log(
744 epoch_range,
745 (Unbounded, Unbounded),
746 ReadLogOptions { table_id },
747 )
748 .await
749 .unwrap();
750 while let Some((key, change_log_value)) =
751 expected_change_log_iter.try_next().await.unwrap()
752 {
753 assert!(change_log_iter.is_valid());
754 assert_eq!(
755 change_log_iter.key(),
756 UserKey {
757 table_id,
758 table_key: key,
759 },
760 );
761 assert_eq!(change_log_iter.log_value(), change_log_value);
762 change_log_iter.next().await.unwrap();
763 }
764 assert!(!change_log_iter.is_valid());
765 }
766 }
767 }
768}