1use std::cmp::Ordering;
16use std::collections::{BTreeMap, VecDeque};
17
18use bytes::Bytes;
19use risingwave_common::catalog::TableId;
20use risingwave_common::hash::VirtualNode;
21use risingwave_common::row::Row;
22use risingwave_common::types::Datum;
23use risingwave_common::util::row_serde::OrderedRowSerde;
24use risingwave_hummock_sdk::compaction_group::hummock_version_ext::safe_epoch_read_table_watermarks_impl;
25use risingwave_hummock_sdk::key::FullKey;
26use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap, add_table_stats_map};
27use risingwave_hummock_sdk::table_watermark::{
28 ReadTableWatermark, TableWatermarks, WatermarkDirection,
29};
30
31use super::SkipWatermarkState;
32use crate::compaction_catalog_manager::{CompactionCatalogAgentRef, ValueWatermarkColumnSerdeRef};
33use crate::hummock::HummockResult;
34use crate::hummock::iterator::{Forward, HummockIterator, ValueMeta};
35use crate::hummock::value::HummockValue;
36use crate::monitor::StoreLocalStatistic;
37
38pub struct SkipWatermarkIterator<I, S> {
39 inner: I,
40 state: S,
41 skipped_entry_table_stats: TableStatsMap,
43 last_table_id: Option<TableId>,
45 last_table_stats: TableStats,
47}
48
49impl<I: HummockIterator<Direction = Forward>, S: SkipWatermarkState> SkipWatermarkIterator<I, S> {
50 pub fn new(inner: I, state: S) -> Self {
51 Self {
52 inner,
53 state,
54 skipped_entry_table_stats: TableStatsMap::default(),
55 last_table_id: None,
56 last_table_stats: TableStats::default(),
57 }
58 }
59
60 fn reset_watermark(&mut self) {
61 self.state.reset_watermark();
62 }
63
64 fn reset_skipped_entry_table_stats(&mut self) {
65 self.skipped_entry_table_stats = TableStatsMap::default();
66 self.last_table_id = None;
67 self.last_table_stats = TableStats::default();
68 }
69
70 async fn advance_key_and_watermark(&mut self) -> HummockResult<()> {
75 while self.inner.is_valid() {
78 if !self
79 .state
80 .should_delete(&self.inner.key(), self.inner.value())
81 {
82 break;
83 }
84
85 if self
86 .last_table_id
87 .is_none_or(|last_table_id| last_table_id != self.inner.key().user_key.table_id)
88 {
89 self.add_last_table_stats();
90 self.last_table_id = Some(self.inner.key().user_key.table_id);
91 }
92 self.last_table_stats.total_key_count -= 1;
93 self.last_table_stats.total_key_size -= self.inner.key().encoded_len() as i64;
94 self.last_table_stats.total_value_size -= self.inner.value().encoded_len() as i64;
95
96 self.inner.next().await?;
97 }
98 self.add_last_table_stats();
99 Ok(())
100 }
101
102 fn add_last_table_stats(&mut self) {
103 let Some(last_table_id) = self.last_table_id.take() else {
104 return;
105 };
106 let delta = std::mem::take(&mut self.last_table_stats);
107 let e = self
108 .skipped_entry_table_stats
109 .entry(last_table_id)
110 .or_default();
111 e.total_key_count += delta.total_key_count;
112 e.total_key_size += delta.total_key_size;
113 e.total_value_size += delta.total_value_size;
114 }
115}
116
117impl<I: HummockIterator<Direction = Forward>, S: SkipWatermarkState> HummockIterator
118 for SkipWatermarkIterator<I, S>
119{
120 type Direction = Forward;
121
122 async fn next(&mut self) -> HummockResult<()> {
123 self.inner.next().await?;
124 if self.state.has_watermark() {
128 self.advance_key_and_watermark().await?;
129 }
130 Ok(())
131 }
132
133 fn key(&self) -> FullKey<&[u8]> {
134 self.inner.key()
135 }
136
137 fn value(&self) -> HummockValue<&[u8]> {
138 self.inner.value()
139 }
140
141 fn is_valid(&self) -> bool {
142 self.inner.is_valid()
143 }
144
145 async fn rewind(&mut self) -> HummockResult<()> {
146 self.reset_watermark();
147 self.reset_skipped_entry_table_stats();
148 self.inner.rewind().await?;
149 self.advance_key_and_watermark().await?;
150 Ok(())
151 }
152
153 async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
154 self.reset_watermark();
155 self.reset_skipped_entry_table_stats();
156 self.inner.seek(key).await?;
157 self.advance_key_and_watermark().await?;
158 Ok(())
159 }
160
161 fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
162 add_table_stats_map(
163 &mut stats.skipped_by_watermark_table_stats,
164 &self.skipped_entry_table_stats,
165 );
166 self.inner.collect_local_statistic(stats)
167 }
168
169 fn value_meta(&self) -> ValueMeta {
170 self.inner.value_meta()
171 }
172}
173pub struct PkPrefixSkipWatermarkState {
174 watermarks: BTreeMap<TableId, ReadTableWatermark>,
175 remain_watermarks: VecDeque<(TableId, VirtualNode, WatermarkDirection, Bytes)>,
176}
177
178impl SkipWatermarkState for PkPrefixSkipWatermarkState {
179 #[inline(always)]
180 fn has_watermark(&self) -> bool {
181 !self.remain_watermarks.is_empty()
182 }
183
184 fn should_delete(&mut self, key: &FullKey<&[u8]>, value: HummockValue<&[u8]>) -> bool {
185 if let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
186 let key_table_id = key.user_key.table_id;
187 let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
188 match (&key_table_id, &key_vnode).cmp(&(table_id, vnode)) {
189 Ordering::Less => {
190 return false;
191 }
192 Ordering::Equal => {
193 return direction.key_filter_by_watermark(inner_key, watermark);
194 }
195 Ordering::Greater => {
196 return self.advance_watermark(key, value);
199 }
200 }
201 }
202 false
203 }
204
205 fn reset_watermark(&mut self) {
206 self.remain_watermarks = self
207 .watermarks
208 .iter()
209 .flat_map(|(table_id, read_watermarks)| {
210 read_watermarks
211 .vnode_watermarks
212 .iter()
213 .map(|(vnode, watermarks)| {
214 (
215 *table_id,
216 *vnode,
217 read_watermarks.direction,
218 watermarks.clone(),
219 )
220 })
221 })
222 .collect();
223 }
224
225 fn advance_watermark(&mut self, key: &FullKey<&[u8]>, _value: HummockValue<&[u8]>) -> bool {
230 let key_table_id = key.user_key.table_id;
231 let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
232 while let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
233 match (table_id, vnode).cmp(&(&key_table_id, &key_vnode)) {
234 Ordering::Less => {
235 self.remain_watermarks.pop_front();
236 continue;
237 }
238 Ordering::Equal => {
239 match direction {
240 WatermarkDirection::Ascending => {
241 match inner_key.cmp(watermark.as_ref()) {
242 Ordering::Less => {
243 return true;
246 }
247 Ordering::Equal | Ordering::Greater => {
248 self.remain_watermarks.pop_front();
251 #[cfg(debug_assertions)]
256 {
257 if let Some((next_table_id, next_vnode, _, _)) =
258 self.remain_watermarks.front()
259 {
260 assert!(
261 (next_table_id, next_vnode)
262 > (&key_table_id, &key_vnode)
263 );
264 }
265 }
266 return false;
267 }
268 }
269 }
270 WatermarkDirection::Descending => {
271 return match inner_key.cmp(watermark.as_ref()) {
272 Ordering::Less | Ordering::Equal => false,
274 Ordering::Greater => true,
277 };
278 }
279 }
280 }
281 Ordering::Greater => {
282 return false;
283 }
284 }
285 }
286 false
287 }
288}
289
290impl PkPrefixSkipWatermarkState {
291 pub fn new(watermarks: BTreeMap<TableId, ReadTableWatermark>) -> Self {
292 Self {
293 remain_watermarks: VecDeque::new(),
294 watermarks,
295 }
296 }
297
298 pub fn from_safe_epoch_watermarks(
299 safe_epoch_watermarks: BTreeMap<TableId, TableWatermarks>,
300 ) -> Self {
301 let watermarks = safe_epoch_read_table_watermarks_impl(safe_epoch_watermarks);
302 Self::new(watermarks)
303 }
304}
305
306pub struct NonPkPrefixSkipWatermarkState {
307 watermarks: BTreeMap<TableId, ReadTableWatermark>,
308 remain_watermarks: VecDeque<(TableId, VirtualNode, WatermarkDirection, Datum)>,
309 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
310
311 last_serde: Option<(OrderedRowSerde, OrderedRowSerde, usize)>,
312 last_table_id: Option<TableId>,
313}
314
315impl NonPkPrefixSkipWatermarkState {
316 pub fn new(
317 watermarks: BTreeMap<TableId, ReadTableWatermark>,
318 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
319 ) -> Self {
320 Self {
321 remain_watermarks: VecDeque::new(),
322 watermarks,
323 compaction_catalog_agent_ref,
324 last_serde: None,
325 last_table_id: None,
326 }
327 }
328
329 pub fn from_safe_epoch_watermarks(
330 safe_epoch_watermarks: BTreeMap<TableId, TableWatermarks>,
331 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
332 ) -> Self {
333 let watermarks = safe_epoch_read_table_watermarks_impl(safe_epoch_watermarks);
334 Self::new(watermarks, compaction_catalog_agent_ref)
335 }
336}
337
338impl SkipWatermarkState for NonPkPrefixSkipWatermarkState {
339 #[inline(always)]
340 fn has_watermark(&self) -> bool {
341 !self.remain_watermarks.is_empty()
342 }
343
344 fn should_delete(&mut self, key: &FullKey<&[u8]>, value: HummockValue<&[u8]>) -> bool {
345 if let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
346 let key_table_id = key.user_key.table_id;
347 {
348 if self
349 .last_table_id
350 .is_none_or(|last_table_id| last_table_id != key_table_id)
351 {
352 self.last_table_id = Some(key_table_id);
353 self.last_serde = self.compaction_catalog_agent_ref.watermark_serde(*table_id);
354 }
355 }
356
357 let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
358 match (&key_table_id, &key_vnode).cmp(&(table_id, vnode)) {
359 Ordering::Less => {
360 return false;
361 }
362 Ordering::Equal => {
363 let (pk_prefix_serde, watermark_col_serde, watermark_col_idx_in_pk) =
364 self.last_serde.as_ref().unwrap();
365 let row = pk_prefix_serde
366 .deserialize(inner_key)
367 .unwrap_or_else(|_| {
368 panic!("Failed to deserialize pk_prefix inner_key {:?} serde data_types {:?} order_types {:?}", inner_key, pk_prefix_serde.get_data_types(), pk_prefix_serde.get_order_types());
369 });
370 let watermark_col_in_pk = row.datum_at(*watermark_col_idx_in_pk);
371 return direction.datum_filter_by_watermark(
372 watermark_col_in_pk,
373 watermark,
374 watermark_col_serde.get_order_types()[0],
375 );
376 }
377 Ordering::Greater => {
378 return self.advance_watermark(key, value);
381 }
382 }
383 }
384 false
385 }
386
387 fn reset_watermark(&mut self) {
388 self.remain_watermarks = self
389 .watermarks
390 .iter()
391 .flat_map(|(table_id, read_watermarks)| {
392 let watermark_serde = self.compaction_catalog_agent_ref.watermark_serde(*table_id).map(|(_pk_serde, watermark_serde, _watermark_col_idx_in_pk)| watermark_serde);
393
394 read_watermarks
395 .vnode_watermarks
396 .iter()
397 .flat_map(move |(vnode, watermarks)| {
398 let watermark_serde = watermark_serde.as_ref()?;
400 Some((
401 *table_id,
402 *vnode,
403 read_watermarks.direction,
404 {
405 let row = watermark_serde
406 .deserialize(watermarks).unwrap_or_else(|_| {
407 panic!("Failed to deserialize watermark {:?} serde data_types {:?} order_types {:?}", watermarks, watermark_serde.get_data_types(), watermark_serde.get_order_types());
408 });
409 row[0].clone()
410 },
411 ))
412 })
413 })
414 .collect();
415 }
416
417 fn advance_watermark(&mut self, key: &FullKey<&[u8]>, _value: HummockValue<&[u8]>) -> bool {
418 let key_table_id = key.user_key.table_id;
419 let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
420 while let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
421 match (table_id, vnode).cmp(&(&key_table_id, &key_vnode)) {
422 Ordering::Less => {
423 self.remain_watermarks.pop_front();
424 continue;
425 }
426 Ordering::Equal => {
427 let (pk_prefix_serde, watermark_col_serde, watermark_col_idx_in_pk) =
428 self.last_serde.as_ref().unwrap();
429
430 let row = pk_prefix_serde
431 .deserialize(inner_key)
432 .unwrap_or_else(|_| {
433 panic!("Failed to deserialize pk_prefix inner_key {:?} serde data_types {:?} order_types {:?}", inner_key, pk_prefix_serde.get_data_types(), pk_prefix_serde.get_order_types());
434 });
435 let watermark_col_in_pk = row.datum_at(*watermark_col_idx_in_pk);
436
437 return direction.datum_filter_by_watermark(
438 watermark_col_in_pk,
439 watermark,
440 watermark_col_serde.get_order_types()[0],
441 );
442 }
443 Ordering::Greater => {
444 return false;
445 }
446 }
447 }
448 false
449 }
450}
451
452pub type PkPrefixSkipWatermarkIterator<I> = SkipWatermarkIterator<I, PkPrefixSkipWatermarkState>;
453
454pub type NonPkPrefixSkipWatermarkIterator<I> =
455 SkipWatermarkIterator<I, NonPkPrefixSkipWatermarkState>;
456
457pub type ValueSkipWatermarkIterator<I> = SkipWatermarkIterator<I, ValueSkipWatermarkState>;
458
459pub struct ValueSkipWatermarkState {
460 pub watermarks: BTreeMap<TableId, ReadTableWatermark>,
461 remain_watermarks: VecDeque<(TableId, VirtualNode, WatermarkDirection, Bytes)>,
462 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
463 last_serde: Option<ValueWatermarkColumnSerdeRef>,
464 last_table_id: Option<TableId>,
465}
466
467impl ValueSkipWatermarkState {
468 pub fn new(
469 watermarks: BTreeMap<TableId, ReadTableWatermark>,
470 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
471 ) -> Self {
472 Self {
473 remain_watermarks: VecDeque::new(),
474 watermarks,
475 compaction_catalog_agent_ref,
476 last_serde: None,
477 last_table_id: None,
478 }
479 }
480
481 pub fn from_safe_epoch_watermarks(
482 safe_epoch_watermarks: BTreeMap<TableId, TableWatermarks>,
483 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
484 ) -> Self {
485 let watermarks = safe_epoch_read_table_watermarks_impl(safe_epoch_watermarks);
486 Self::new(watermarks, compaction_catalog_agent_ref)
487 }
488
489 pub fn may_delete(&self, key: &FullKey<&[u8]>) -> bool {
490 let table_id = key.user_key.table_id;
491 self.watermarks.contains_key(&table_id)
492 }
493}
494
495impl SkipWatermarkState for ValueSkipWatermarkState {
496 #[inline(always)]
497 fn has_watermark(&self) -> bool {
498 !self.remain_watermarks.is_empty()
499 }
500
501 fn should_delete(&mut self, key: &FullKey<&[u8]>, value: HummockValue<&[u8]>) -> bool {
502 if let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
503 let key_table_id = key.user_key.table_id;
504 let key_vnode = key.user_key.table_key.vnode_part();
505 if self
506 .last_table_id
507 .is_none_or(|last_table_id| last_table_id != key_table_id)
508 {
509 self.last_table_id = Some(key_table_id);
510 self.last_serde = self
511 .compaction_catalog_agent_ref
512 .value_watermark_serde(*table_id);
513 }
514 match (&key_table_id, &key_vnode).cmp(&(table_id, vnode)) {
515 Ordering::Less => {
516 return false;
517 }
518 Ordering::Equal => {
519 let HummockValue::Put(value) = value else {
520 return false;
521 };
522 let Some(ref last_serde) = self.last_serde else {
523 return false;
524 };
525 let Ok(watermark_column_value) = last_serde.deserialize(value) else {
526 tracing::error!(
527 ?table_id,
528 ?vnode,
529 ?value,
530 "Failed to deserialize watermark column"
531 );
532 return false;
533 };
534 let Some(watermark_column_value) = watermark_column_value else {
535 tracing::debug!(
536 ?table_id,
537 ?vnode,
538 ?value,
539 "The specified watermark column was not found in the value, likely due to the use of a non-current catalog. Restarting the compactor should resolve this issue."
540 );
541 return false;
542 };
543 return direction.key_filter_by_watermark(&watermark_column_value, watermark);
544 }
545 Ordering::Greater => {
546 return self.advance_watermark(key, value);
549 }
550 }
551 }
552 false
553 }
554
555 fn reset_watermark(&mut self) {
556 self.remain_watermarks = self
557 .watermarks
558 .iter()
559 .flat_map(|(table_id, read_watermarks)| {
560 read_watermarks
561 .vnode_watermarks
562 .iter()
563 .map(|(vnode, watermarks)| {
564 (
565 *table_id,
566 *vnode,
567 read_watermarks.direction,
568 watermarks.clone(),
569 )
570 })
571 })
572 .collect();
573 }
574
575 fn advance_watermark(&mut self, key: &FullKey<&[u8]>, value: HummockValue<&[u8]>) -> bool {
576 let key_table_id = key.user_key.table_id;
577 let key_vnode = key.user_key.table_key.vnode_part();
578 while let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
579 match (table_id, vnode).cmp(&(&key_table_id, &key_vnode)) {
580 Ordering::Less => {
581 self.remain_watermarks.pop_front();
582 continue;
583 }
584 Ordering::Equal => {
585 let HummockValue::Put(value) = value else {
586 return false;
587 };
588 let Some(ref last_serde) = self.last_serde else {
589 return false;
590 };
591 let Ok(watermark_column_value) = last_serde.deserialize(value) else {
592 tracing::error!(
593 ?table_id,
594 ?vnode,
595 ?value,
596 "Failed to deserialize watermark column."
597 );
598 return false;
599 };
600 let Some(watermark_column_value) = watermark_column_value else {
601 tracing::warn!(
602 ?table_id,
603 ?vnode,
604 ?value,
605 "The specified watermark column was not found in the value, likely due to the use of a non-current catalog. Restarting the compactor should resolve this issue."
606 );
607 return false;
608 };
609 return direction.key_filter_by_watermark(&watermark_column_value, watermark);
610 }
611 Ordering::Greater => {
612 return false;
613 }
614 }
615 }
616 false
617 }
618}
619
620#[cfg(test)]
621mod tests {
622 use std::collections::{BTreeMap, HashMap};
623 use std::iter::{empty, once};
624 use std::sync::Arc;
625
626 use bytes::Bytes;
627 use itertools::Itertools;
628 use risingwave_common::catalog::TableId;
629 use risingwave_common::hash::VirtualNode;
630 use risingwave_common::row::{OwnedRow, RowExt};
631 use risingwave_common::types::{DataType, ScalarImpl};
632 use risingwave_common::util::epoch::test_epoch;
633 use risingwave_common::util::row_serde::OrderedRowSerde;
634 use risingwave_common::util::sort_util::OrderType;
635 use risingwave_hummock_sdk::EpochWithGap;
636 use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey, gen_key_from_str};
637 use risingwave_hummock_sdk::table_watermark::{ReadTableWatermark, WatermarkDirection};
638
639 use super::PkPrefixSkipWatermarkState;
640 use crate::compaction_catalog_manager::{
641 CompactionCatalogAgent, FilterKeyExtractorImpl, FullKeyFilterKeyExtractor,
642 };
643 use crate::hummock::iterator::{
644 HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
645 NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator,
646 };
647 use crate::hummock::shared_buffer::shared_buffer_batch::{
648 SharedBufferBatch, SharedBufferValue,
649 };
650 use crate::row_serde::row_serde_util::{serialize_pk, serialize_pk_with_vnode};
651
652 const EPOCH: u64 = test_epoch(1);
653 const TABLE_ID: TableId = TableId::new(233);
654
655 async fn assert_iter_eq(
656 mut first: Option<impl HummockIterator>,
657 mut second: impl HummockIterator,
658 seek_key: Option<(usize, usize)>,
659 ) {
660 if let Some((vnode, key_index)) = seek_key {
661 let (seek_key, _) = gen_key_value(vnode, key_index);
662 let full_key = FullKey {
663 user_key: UserKey {
664 table_id: TABLE_ID,
665 table_key: seek_key,
666 },
667 epoch_with_gap: EpochWithGap::new_from_epoch(EPOCH),
668 };
669 if let Some(first) = &mut first {
670 first.seek(full_key.to_ref()).await.unwrap();
671 }
672 second.seek(full_key.to_ref()).await.unwrap()
673 } else {
674 if let Some(first) = &mut first {
675 first.rewind().await.unwrap();
676 }
677 second.rewind().await.unwrap();
678 }
679
680 if let Some(first) = &mut first {
681 while first.is_valid() {
682 assert!(second.is_valid());
683 let first_key = first.key();
684 let second_key = second.key();
685 assert_eq!(first_key, second_key);
686 assert_eq!(first.value(), second.value());
687 first.next().await.unwrap();
688 second.next().await.unwrap();
689 }
690 }
691 assert!(!second.is_valid());
692 }
693
694 fn build_batch(
695 pairs: impl Iterator<Item = (TableKey<Bytes>, SharedBufferValue<Bytes>)>,
696 table_id: TableId,
697 ) -> Option<SharedBufferBatch> {
698 let pairs: Vec<_> = pairs.collect();
699 if pairs.is_empty() {
700 None
701 } else {
702 Some(SharedBufferBatch::for_test(pairs, EPOCH, table_id))
703 }
704 }
705
706 fn filter_with_watermarks(
707 iter: impl Iterator<Item = (TableKey<Bytes>, SharedBufferValue<Bytes>)>,
708 table_watermarks: ReadTableWatermark,
709 ) -> impl Iterator<Item = (TableKey<Bytes>, SharedBufferValue<Bytes>)> {
710 iter.filter(move |(key, _)| {
711 if let Some(watermark) = table_watermarks.vnode_watermarks.get(&key.vnode_part()) {
712 !table_watermarks
713 .direction
714 .key_filter_by_watermark(key.key_part(), watermark)
715 } else {
716 true
717 }
718 })
719 }
720
721 fn gen_inner_key(index: usize) -> String {
722 format!("key{:5}", index)
723 }
724
725 async fn test_watermark(
726 watermarks: impl IntoIterator<Item = (usize, usize)>,
727 direction: WatermarkDirection,
728 ) {
729 let test_index: [(usize, usize); 7] =
730 [(0, 2), (0, 3), (0, 4), (1, 1), (1, 3), (4, 2), (8, 1)];
731 let items = test_index
732 .iter()
733 .map(|(vnode, key_index)| gen_key_value(*vnode, *key_index))
734 .collect_vec();
735
736 let read_watermark = ReadTableWatermark {
737 direction,
738 vnode_watermarks: BTreeMap::from_iter(watermarks.into_iter().map(
739 |(vnode, key_index)| {
740 (
741 VirtualNode::from_index(vnode),
742 Bytes::from(gen_inner_key(key_index)),
743 )
744 },
745 )),
746 };
747
748 let gen_iters = || {
749 let batch = build_batch(
750 filter_with_watermarks(items.clone().into_iter(), read_watermark.clone()),
751 TABLE_ID,
752 );
753
754 let iter = PkPrefixSkipWatermarkIterator::new(
755 build_batch(items.clone().into_iter(), TABLE_ID)
756 .unwrap()
757 .into_forward_iter(),
758 PkPrefixSkipWatermarkState::new(BTreeMap::from_iter(once((
759 TABLE_ID,
760 read_watermark.clone(),
761 )))),
762 );
763 (batch.map(|batch| batch.into_forward_iter()), iter)
764 };
765 let (first, second) = gen_iters();
766 assert_iter_eq(first, second, None).await;
767 for (vnode, key_index) in &test_index {
768 let (first, second) = gen_iters();
769 assert_iter_eq(first, second, Some((*vnode, *key_index))).await;
770 }
771 let (last_vnode, last_key_index) = test_index.last().unwrap();
772 let (first, second) = gen_iters();
773 assert_iter_eq(first, second, Some((*last_vnode, last_key_index + 1))).await;
774 }
775
776 fn gen_key_value(vnode: usize, index: usize) -> (TableKey<Bytes>, SharedBufferValue<Bytes>) {
777 (
778 gen_key_from_str(VirtualNode::from_index(vnode), &gen_inner_key(index)),
779 SharedBufferValue::Insert(Bytes::copy_from_slice(
780 format!("{}-value-{}", vnode, index).as_bytes(),
781 )),
782 )
783 }
784
785 #[tokio::test]
786 async fn test_no_watermark() {
787 test_watermark(empty(), WatermarkDirection::Ascending).await;
788 test_watermark(empty(), WatermarkDirection::Descending).await;
789 }
790
791 #[tokio::test]
792 async fn test_too_low_watermark() {
793 test_watermark(vec![(0, 0)], WatermarkDirection::Ascending).await;
794 test_watermark(vec![(0, 10)], WatermarkDirection::Descending).await;
795 }
796
797 #[tokio::test]
798 async fn test_single_watermark() {
799 test_watermark(vec![(0, 3)], WatermarkDirection::Ascending).await;
800 test_watermark(vec![(0, 3)], WatermarkDirection::Descending).await;
801 }
802
803 #[tokio::test]
804 async fn test_watermark_vnode_no_data() {
805 test_watermark(vec![(3, 3)], WatermarkDirection::Ascending).await;
806 test_watermark(vec![(3, 3)], WatermarkDirection::Descending).await;
807 }
808
809 #[tokio::test]
810 async fn test_filter_all() {
811 test_watermark(
812 vec![(0, 5), (1, 4), (2, 0), (4, 3), (8, 2)],
813 WatermarkDirection::Ascending,
814 )
815 .await;
816 test_watermark(
817 vec![(0, 0), (1, 0), (2, 0), (4, 0), (8, 0)],
818 WatermarkDirection::Descending,
819 )
820 .await;
821 }
822
823 #[tokio::test]
824 async fn test_advance_multi_vnode() {
825 test_watermark(vec![(1, 2), (8, 0)], WatermarkDirection::Ascending).await;
826 }
827
828 #[tokio::test]
829 async fn test_non_pk_prefix_watermark() {
830 fn gen_key_value(
831 vnode: usize,
832 col_0: i32,
833 col_1: i32,
834 col_2: i32,
835 col_3: i32,
836 pk_serde: &OrderedRowSerde,
837 pk_indices: &[usize],
838 ) -> (TableKey<Bytes>, SharedBufferValue<Bytes>) {
839 let r = OwnedRow::new(vec![
840 Some(ScalarImpl::Int32(col_0)),
841 Some(ScalarImpl::Int32(col_1)),
842 Some(ScalarImpl::Int32(col_2)), Some(ScalarImpl::Int32(col_3)),
844 ]);
845
846 let pk = r.project(pk_indices);
847
848 let k1 = serialize_pk_with_vnode(pk, pk_serde, VirtualNode::from_index(vnode));
849 let v1 = SharedBufferValue::Insert(Bytes::copy_from_slice(
850 format!("{}-value-{}", vnode, col_2).as_bytes(),
851 ));
852 (k1, v1)
853 }
854
855 let watermark_direction = WatermarkDirection::Ascending;
856
857 let watermark_col_serde =
858 OrderedRowSerde::new(vec![DataType::Int32], vec![OrderType::ascending()]);
859 let pk_serde = OrderedRowSerde::new(
860 vec![DataType::Int32, DataType::Int32, DataType::Int32],
861 vec![
862 OrderType::ascending(),
863 OrderType::ascending(),
864 OrderType::ascending(),
865 ],
866 );
867
868 let pk_indices = vec![0, 2, 3];
869 let watermark_col_idx_in_pk = 1;
870
871 {
872 let shared_buffer_batch = {
874 let mut kv_pairs = (0..10)
875 .map(|i| gen_key_value(0, i, 0, i, i, &pk_serde, &pk_indices))
876 .collect_vec();
877 kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
878 build_batch(kv_pairs.into_iter(), TABLE_ID)
879 }
880 .unwrap();
881
882 {
883 let read_watermark = ReadTableWatermark {
885 direction: watermark_direction,
886 vnode_watermarks: BTreeMap::default(),
887 };
888
889 let compaction_catalog_agent_ref = CompactionCatalogAgent::for_test(vec![TABLE_ID]);
890
891 let mut iter = NonPkPrefixSkipWatermarkIterator::new(
892 shared_buffer_batch.clone().into_forward_iter(),
893 NonPkPrefixSkipWatermarkState::new(
894 BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
895 compaction_catalog_agent_ref,
896 ),
897 );
898
899 iter.rewind().await.unwrap();
900 assert!(iter.is_valid());
901 for i in 0..10 {
902 let (k, _v) = gen_key_value(0, i, 0, i, i, &pk_serde, &pk_indices);
903 assert_eq!(iter.key().user_key.table_key.as_ref(), k.as_ref());
904 iter.next().await.unwrap();
905 }
906 assert!(!iter.is_valid());
907 }
908
909 {
910 let watermark = {
912 let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
913 serialize_pk(r1, &watermark_col_serde)
914 };
915
916 let read_watermark = ReadTableWatermark {
917 direction: watermark_direction,
918 vnode_watermarks: BTreeMap::from_iter(once((
919 VirtualNode::from_index(0),
920 watermark.clone(),
921 ))),
922 };
923
924 let full_key_filter_key_extractor =
925 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
926
927 let table_id_to_vnode =
928 HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
929
930 let table_id_to_watermark_serde = HashMap::from_iter(once((
931 TABLE_ID,
932 Some((
933 pk_serde.clone(),
934 watermark_col_serde.clone(),
935 watermark_col_idx_in_pk,
936 )),
937 )));
938
939 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
940 full_key_filter_key_extractor,
941 table_id_to_vnode,
942 table_id_to_watermark_serde,
943 HashMap::default(),
944 ));
945
946 let mut iter = NonPkPrefixSkipWatermarkIterator::new(
947 shared_buffer_batch.clone().into_forward_iter(),
948 NonPkPrefixSkipWatermarkState::new(
949 BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
950 compaction_catalog_agent_ref,
951 ),
952 );
953
954 iter.rewind().await.unwrap();
955 assert!(iter.is_valid());
956 for i in 5..10 {
957 let (k, _v) = gen_key_value(0, i, 0, i, i, &pk_serde, &pk_indices);
958 assert_eq!(iter.key().user_key.table_key.as_ref(), k.as_ref());
959 iter.next().await.unwrap();
960 }
961 assert!(!iter.is_valid());
962 }
963 }
964
965 {
966 let shared_buffer_batch = {
968 let mut kv_pairs = (0..10_i32)
969 .map(|i| gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices))
970 .collect_vec();
971
972 kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
973 build_batch(kv_pairs.into_iter(), TABLE_ID)
974 };
975
976 {
977 let watermark = {
979 let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
980 serialize_pk(r1, &watermark_col_serde)
981 };
982
983 let read_watermark = ReadTableWatermark {
984 direction: watermark_direction,
985 vnode_watermarks: BTreeMap::from_iter(
986 (0..2).map(|i| (VirtualNode::from_index(i), watermark.clone())),
987 ),
988 };
989
990 let full_key_filter_key_extractor =
991 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
992
993 let table_id_to_vnode =
994 HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
995
996 let table_id_to_watermark_serde = HashMap::from_iter(once((
997 TABLE_ID,
998 Some((
999 pk_serde.clone(),
1000 watermark_col_serde.clone(),
1001 watermark_col_idx_in_pk,
1002 )),
1003 )));
1004
1005 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1006 full_key_filter_key_extractor,
1007 table_id_to_vnode,
1008 table_id_to_watermark_serde,
1009 HashMap::default(),
1010 ));
1011
1012 let mut iter = NonPkPrefixSkipWatermarkIterator::new(
1013 shared_buffer_batch.clone().unwrap().into_forward_iter(),
1014 NonPkPrefixSkipWatermarkState::new(
1015 BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
1016 compaction_catalog_agent_ref,
1017 ),
1018 );
1019
1020 iter.rewind().await.unwrap();
1021 assert!(iter.is_valid());
1022 let mut kv_pairs = (5..10_i32)
1023 .map(|i| {
1024 let (k, v) =
1025 gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices);
1026 (k, v)
1027 })
1028 .collect_vec();
1029 kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1030 let mut index = 0;
1031 while iter.is_valid() {
1032 assert!(kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
1033 iter.next().await.unwrap();
1034 index += 1;
1035 }
1036 }
1037
1038 {
1039 let watermark = {
1041 let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
1042 serialize_pk(r1, &watermark_col_serde)
1043 };
1044
1045 let read_watermark = ReadTableWatermark {
1046 direction: watermark_direction,
1047 vnode_watermarks: BTreeMap::from_iter(
1048 (0..2).map(|i| (VirtualNode::from_index(i), watermark.clone())),
1049 ),
1050 };
1051
1052 let full_key_filter_key_extractor =
1053 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
1054
1055 let table_id_to_vnode =
1056 HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
1057
1058 let table_id_to_watermark_serde = HashMap::from_iter(once((
1059 TABLE_ID,
1060 Some((
1061 pk_serde.clone(),
1062 watermark_col_serde.clone(),
1063 watermark_col_idx_in_pk,
1064 )),
1065 )));
1066
1067 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1068 full_key_filter_key_extractor,
1069 table_id_to_vnode,
1070 table_id_to_watermark_serde,
1071 HashMap::default(),
1072 ));
1073
1074 let mut iter = NonPkPrefixSkipWatermarkIterator::new(
1075 shared_buffer_batch.clone().unwrap().into_forward_iter(),
1076 NonPkPrefixSkipWatermarkState::new(
1077 BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
1078 compaction_catalog_agent_ref,
1079 ),
1080 );
1081
1082 iter.rewind().await.unwrap();
1083 assert!(iter.is_valid());
1084 let mut kv_pairs = (5..10_i32)
1085 .map(|i| {
1086 let (k, v) =
1087 gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices);
1088 (k, v)
1089 })
1090 .collect_vec();
1091 kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1092 let mut index = 0;
1093 while iter.is_valid() {
1094 assert!(kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
1095 iter.next().await.unwrap();
1096 index += 1;
1097 }
1098 }
1099
1100 {
1101 let watermark = {
1103 let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
1104 serialize_pk(r1, &watermark_col_serde)
1105 };
1106
1107 let read_watermark = ReadTableWatermark {
1108 direction: WatermarkDirection::Descending,
1109 vnode_watermarks: BTreeMap::from_iter(
1110 (0..2).map(|i| (VirtualNode::from_index(i), watermark.clone())),
1111 ),
1112 };
1113
1114 let full_key_filter_key_extractor =
1115 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
1116
1117 let table_id_to_vnode =
1118 HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
1119
1120 let table_id_to_watermark_serde = HashMap::from_iter(once((
1121 TABLE_ID,
1122 Some((
1123 pk_serde.clone(),
1124 watermark_col_serde.clone(),
1125 watermark_col_idx_in_pk,
1126 )),
1127 )));
1128
1129 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1130 full_key_filter_key_extractor,
1131 table_id_to_vnode,
1132 table_id_to_watermark_serde,
1133 HashMap::default(),
1134 ));
1135
1136 let mut iter = NonPkPrefixSkipWatermarkIterator::new(
1137 shared_buffer_batch.clone().unwrap().into_forward_iter(),
1138 NonPkPrefixSkipWatermarkState::new(
1139 BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
1140 compaction_catalog_agent_ref,
1141 ),
1142 );
1143
1144 iter.rewind().await.unwrap();
1145 assert!(iter.is_valid());
1146 let mut kv_pairs = (0..=5_i32)
1147 .map(|i| {
1148 let (k, v) =
1149 gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices);
1150 (k, v)
1151 })
1152 .collect_vec();
1153 kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1154 let mut index = 0;
1155 while iter.is_valid() {
1156 assert!(kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
1157 iter.next().await.unwrap();
1158 index += 1;
1159 }
1160 }
1161 }
1162 }
1163
1164 #[tokio::test]
1165 async fn test_mix_watermark() {
1166 fn gen_key_value(
1167 vnode: usize,
1168 col_0: i32,
1169 col_1: i32,
1170 col_2: i32,
1171 col_3: i32,
1172 pk_serde: &OrderedRowSerde,
1173 pk_indices: &[usize],
1174 ) -> (TableKey<Bytes>, SharedBufferValue<Bytes>) {
1175 let r = OwnedRow::new(vec![
1176 Some(ScalarImpl::Int32(col_0)),
1177 Some(ScalarImpl::Int32(col_1)),
1178 Some(ScalarImpl::Int32(col_2)), Some(ScalarImpl::Int32(col_3)),
1180 ]);
1181
1182 let pk = r.project(pk_indices);
1183
1184 let k1 = serialize_pk_with_vnode(pk, pk_serde, VirtualNode::from_index(vnode));
1185 let v1 = SharedBufferValue::Insert(Bytes::copy_from_slice(
1186 format!("{}-value-{}-{}-{}-{}", vnode, col_0, col_1, col_2, col_3).as_bytes(),
1187 ));
1188
1189 (k1, v1)
1190 }
1191
1192 let watermark_col_serde =
1193 OrderedRowSerde::new(vec![DataType::Int32], vec![OrderType::ascending()]);
1194 let t1_pk_serde = OrderedRowSerde::new(
1195 vec![DataType::Int32, DataType::Int32, DataType::Int32],
1196 vec![
1197 OrderType::ascending(),
1198 OrderType::ascending(),
1199 OrderType::ascending(),
1200 ],
1201 );
1202
1203 let t1_pk_indices = vec![0, 2, 3];
1204 let t1_watermark_col_idx_in_pk = 1;
1205
1206 let t2_pk_indices = vec![0, 1, 2];
1207
1208 let t2_pk_serde = OrderedRowSerde::new(
1209 vec![DataType::Int32, DataType::Int32, DataType::Int32],
1210 vec![
1211 OrderType::ascending(),
1212 OrderType::ascending(),
1213 OrderType::ascending(),
1214 ],
1215 );
1216
1217 let t1_id = TABLE_ID;
1218 let t2_id = TableId::from(t1_id.as_raw_id() + 1);
1219
1220 let t1_shared_buffer_batch = {
1221 let mut kv_pairs = (0..10_i32)
1222 .map(|i| {
1223 gen_key_value(
1224 i as usize % 2,
1225 10 - i,
1226 0,
1227 i,
1228 i,
1229 &t1_pk_serde,
1230 &t1_pk_indices,
1231 )
1232 })
1233 .collect_vec();
1234
1235 kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1236 build_batch(kv_pairs.into_iter(), t1_id).unwrap()
1237 };
1238
1239 let t2_shared_buffer_batch = {
1240 let mut kv_pairs = (0..10_i32)
1241 .map(|i| {
1242 gen_key_value(
1243 i as usize % 2,
1244 10 - i,
1245 0,
1246 0,
1247 0,
1248 &t2_pk_serde,
1249 &t2_pk_indices,
1250 )
1251 })
1252 .collect_vec();
1253
1254 kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1255 build_batch(kv_pairs.into_iter(), t2_id).unwrap()
1256 };
1257
1258 let t1_watermark = {
1259 let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
1260 serialize_pk(r1, &watermark_col_serde)
1261 };
1262
1263 let t1_read_watermark = ReadTableWatermark {
1264 direction: WatermarkDirection::Ascending,
1265 vnode_watermarks: BTreeMap::from_iter(
1266 (0..2).map(|i| (VirtualNode::from_index(i), t1_watermark.clone())),
1267 ),
1268 };
1269
1270 let t2_watermark = {
1271 let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
1272 serialize_pk(r1, &watermark_col_serde)
1273 };
1274
1275 let t2_read_watermark = ReadTableWatermark {
1276 direction: WatermarkDirection::Ascending,
1277 vnode_watermarks: BTreeMap::from_iter(
1278 (0..2).map(|i| (VirtualNode::from_index(i), t2_watermark.clone())),
1279 ),
1280 };
1281
1282 {
1283 let t1_iter = t1_shared_buffer_batch.clone().into_forward_iter();
1285 let t2_iter = t2_shared_buffer_batch.clone().into_forward_iter();
1286 let iter_vec = vec![t1_iter, t2_iter];
1287 let merge_iter = MergeIterator::new(iter_vec);
1288
1289 let full_key_filter_key_extractor =
1290 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
1291
1292 let table_id_to_vnode =
1293 HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
1294
1295 let table_id_to_watermark_serde = HashMap::from_iter(once((
1296 t1_id,
1297 Some((
1298 t1_pk_serde.clone(),
1299 watermark_col_serde.clone(),
1300 t1_watermark_col_idx_in_pk,
1301 )),
1302 )));
1303
1304 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1305 full_key_filter_key_extractor,
1306 table_id_to_vnode,
1307 table_id_to_watermark_serde,
1308 HashMap::default(),
1309 ));
1310
1311 let mut iter = NonPkPrefixSkipWatermarkIterator::new(
1312 merge_iter,
1313 NonPkPrefixSkipWatermarkState::new(
1314 BTreeMap::from_iter(once((TABLE_ID, t1_read_watermark.clone()))),
1315 compaction_catalog_agent_ref,
1316 ),
1317 );
1318
1319 iter.rewind().await.unwrap();
1320 assert!(iter.is_valid());
1321 let mut t1_kv_pairs = (5..10_i32)
1322 .map(|i| {
1323 let (k, v) = gen_key_value(
1324 i as usize % 2,
1325 10 - i,
1326 0,
1327 i,
1328 i,
1329 &t1_pk_serde,
1330 &t1_pk_indices,
1331 );
1332 (k, v)
1333 })
1334 .collect_vec();
1335
1336 t1_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1337
1338 let mut t2_kv_pairs = (0..10_i32)
1339 .map(|i| {
1340 gen_key_value(
1341 i as usize % 2,
1342 10 - i,
1343 0,
1344 0,
1345 0,
1346 &t2_pk_serde,
1347 &t2_pk_indices,
1348 )
1349 })
1350 .collect_vec();
1351
1352 t2_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1353 let mut index = 0;
1354 for _ in 0..t1_kv_pairs.len() {
1355 assert!(t1_kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
1356 iter.next().await.unwrap();
1357 index += 1;
1358 }
1359
1360 assert!(iter.is_valid());
1361 assert_eq!(t1_kv_pairs.len(), index);
1362
1363 index = 0;
1364 for _ in 0..t2_kv_pairs.len() {
1365 assert!(t2_kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
1366 iter.next().await.unwrap();
1367 index += 1;
1368 }
1369
1370 assert!(!iter.is_valid());
1371 assert_eq!(t2_kv_pairs.len(), index);
1372 }
1373
1374 {
1375 let t1_iter = t1_shared_buffer_batch.clone().into_forward_iter();
1376 let t2_iter = t2_shared_buffer_batch.clone().into_forward_iter();
1377 let iter_vec = vec![t1_iter, t2_iter];
1378 let merge_iter = MergeIterator::new(iter_vec);
1379
1380 let full_key_filter_key_extractor =
1381 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
1382
1383 let table_id_to_vnode = HashMap::from_iter(
1384 vec![
1385 (t1_id, VirtualNode::COUNT_FOR_TEST),
1386 (t2_id, VirtualNode::COUNT_FOR_TEST),
1387 ]
1388 .into_iter(),
1389 );
1390
1391 let table_id_to_watermark_serde = HashMap::from_iter(
1392 vec![
1393 (
1394 t1_id,
1395 Some((
1396 t1_pk_serde.clone(),
1397 watermark_col_serde.clone(),
1398 t1_watermark_col_idx_in_pk,
1399 )),
1400 ),
1401 (t2_id, None),
1402 ]
1403 .into_iter(),
1404 );
1405
1406 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1407 full_key_filter_key_extractor,
1408 table_id_to_vnode,
1409 table_id_to_watermark_serde,
1410 HashMap::default(),
1411 ));
1412
1413 let non_pk_prefix_iter = NonPkPrefixSkipWatermarkIterator::new(
1414 merge_iter,
1415 NonPkPrefixSkipWatermarkState::new(
1416 BTreeMap::from_iter(once((t1_id, t1_read_watermark.clone()))),
1417 compaction_catalog_agent_ref.clone(),
1418 ),
1419 );
1420
1421 let mut mix_iter = PkPrefixSkipWatermarkIterator::new(
1422 non_pk_prefix_iter,
1423 PkPrefixSkipWatermarkState::new(BTreeMap::from_iter(once((
1424 t2_id,
1425 t2_read_watermark.clone(),
1426 )))),
1427 );
1428
1429 mix_iter.rewind().await.unwrap();
1430 assert!(mix_iter.is_valid());
1431
1432 let mut t1_kv_pairs = (5..10_i32)
1433 .map(|i| {
1434 let (k, v) = gen_key_value(
1435 i as usize % 2,
1436 10 - i,
1437 0,
1438 i,
1439 i,
1440 &t1_pk_serde,
1441 &t1_pk_indices,
1442 );
1443 (k, v)
1444 })
1445 .collect_vec();
1446
1447 t1_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1448
1449 let mut t2_kv_pairs = (0..=5_i32)
1450 .map(|i| {
1451 gen_key_value(
1452 i as usize % 2,
1453 10 - i,
1454 0,
1455 0,
1456 0,
1457 &t2_pk_serde,
1458 &t2_pk_indices,
1459 )
1460 })
1461 .collect_vec();
1462
1463 t2_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1464
1465 let mut index = 0;
1466 for _ in 0..t1_kv_pairs.len() {
1467 assert!(
1468 t1_kv_pairs[index].0.as_ref() == mix_iter.key().user_key.table_key.as_ref()
1469 );
1470 mix_iter.next().await.unwrap();
1471 index += 1;
1472 }
1473
1474 assert!(mix_iter.is_valid());
1475 assert_eq!(t1_kv_pairs.len(), index);
1476
1477 index = 0;
1478
1479 for _ in 0..t2_kv_pairs.len() {
1480 assert!(
1481 t2_kv_pairs[index].0.as_ref() == mix_iter.key().user_key.table_key.as_ref()
1482 );
1483 mix_iter.next().await.unwrap();
1484 index += 1;
1485 }
1486
1487 assert!(!mix_iter.is_valid());
1488 assert_eq!(t2_kv_pairs.len(), index);
1489 }
1490 }
1491}