1use std::cmp::Ordering;
16use std::collections::{BTreeMap, VecDeque};
17
18use bytes::Bytes;
19use risingwave_common::catalog::TableId;
20use risingwave_common::hash::VirtualNode;
21use risingwave_common::row::Row;
22use risingwave_common::types::Datum;
23use risingwave_common::util::row_serde::OrderedRowSerde;
24use risingwave_hummock_sdk::compaction_group::hummock_version_ext::safe_epoch_read_table_watermarks_impl;
25use risingwave_hummock_sdk::key::FullKey;
26use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap, add_table_stats_map};
27use risingwave_hummock_sdk::table_watermark::{
28 ReadTableWatermark, TableWatermarks, WatermarkDirection,
29};
30
31use super::SkipWatermarkState;
32use crate::compaction_catalog_manager::{CompactionCatalogAgentRef, ValueWatermarkColumnSerdeRef};
33use crate::hummock::HummockResult;
34use crate::hummock::iterator::{Forward, HummockIterator, ValueMeta};
35use crate::hummock::value::HummockValue;
36use crate::monitor::StoreLocalStatistic;
37
38pub struct SkipWatermarkIterator<I, S> {
39 inner: I,
40 state: S,
41 skipped_entry_table_stats: TableStatsMap,
43 last_table_id: Option<TableId>,
45 last_table_stats: TableStats,
47}
48
49impl<I: HummockIterator<Direction = Forward>, S: SkipWatermarkState> SkipWatermarkIterator<I, S> {
50 pub fn new(inner: I, state: S) -> Self {
51 Self {
52 inner,
53 state,
54 skipped_entry_table_stats: TableStatsMap::default(),
55 last_table_id: None,
56 last_table_stats: TableStats::default(),
57 }
58 }
59
60 fn reset_watermark(&mut self) {
61 self.state.reset_watermark();
62 }
63
64 fn reset_skipped_entry_table_stats(&mut self) {
65 self.skipped_entry_table_stats = TableStatsMap::default();
66 self.last_table_id = None;
67 self.last_table_stats = TableStats::default();
68 }
69
70 async fn advance_key_and_watermark(&mut self) -> HummockResult<()> {
75 while self.inner.is_valid() {
78 if !self
79 .state
80 .should_delete(&self.inner.key(), self.inner.value())
81 {
82 break;
83 }
84
85 if self
86 .last_table_id
87 .is_none_or(|last_table_id| last_table_id != self.inner.key().user_key.table_id)
88 {
89 self.add_last_table_stats();
90 self.last_table_id = Some(self.inner.key().user_key.table_id);
91 }
92 self.last_table_stats.total_key_count -= 1;
93 self.last_table_stats.total_key_size -= self.inner.key().encoded_len() as i64;
94 self.last_table_stats.total_value_size -= self.inner.value().encoded_len() as i64;
95
96 self.inner.next().await?;
97 }
98 self.add_last_table_stats();
99 Ok(())
100 }
101
102 fn add_last_table_stats(&mut self) {
103 let Some(last_table_id) = self.last_table_id.take() else {
104 return;
105 };
106 let delta = std::mem::take(&mut self.last_table_stats);
107 let e = self
108 .skipped_entry_table_stats
109 .entry(last_table_id)
110 .or_default();
111 e.total_key_count += delta.total_key_count;
112 e.total_key_size += delta.total_key_size;
113 e.total_value_size += delta.total_value_size;
114 }
115}
116
117impl<I: HummockIterator<Direction = Forward>, S: SkipWatermarkState> HummockIterator
118 for SkipWatermarkIterator<I, S>
119{
120 type Direction = Forward;
121
122 async fn next(&mut self) -> HummockResult<()> {
123 self.inner.next().await?;
124 if self.state.has_watermark() {
128 self.advance_key_and_watermark().await?;
129 }
130 Ok(())
131 }
132
133 fn key(&self) -> FullKey<&[u8]> {
134 self.inner.key()
135 }
136
137 fn value(&self) -> HummockValue<&[u8]> {
138 self.inner.value()
139 }
140
141 fn is_valid(&self) -> bool {
142 self.inner.is_valid()
143 }
144
145 async fn rewind(&mut self) -> HummockResult<()> {
146 self.reset_watermark();
147 self.reset_skipped_entry_table_stats();
148 self.inner.rewind().await?;
149 self.advance_key_and_watermark().await?;
150 Ok(())
151 }
152
153 async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
154 self.reset_watermark();
155 self.reset_skipped_entry_table_stats();
156 self.inner.seek(key).await?;
157 self.advance_key_and_watermark().await?;
158 Ok(())
159 }
160
161 fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
162 add_table_stats_map(
163 &mut stats.skipped_by_watermark_table_stats,
164 &self.skipped_entry_table_stats,
165 );
166 self.inner.collect_local_statistic(stats)
167 }
168
169 fn value_meta(&self) -> ValueMeta {
170 self.inner.value_meta()
171 }
172}
173pub struct PkPrefixSkipWatermarkState {
174 watermarks: BTreeMap<TableId, ReadTableWatermark>,
175 remain_watermarks: VecDeque<(TableId, VirtualNode, WatermarkDirection, Bytes)>,
176}
177
178impl SkipWatermarkState for PkPrefixSkipWatermarkState {
179 #[inline(always)]
180 fn has_watermark(&self) -> bool {
181 !self.remain_watermarks.is_empty()
182 }
183
184 fn should_delete(&mut self, key: &FullKey<&[u8]>, value: HummockValue<&[u8]>) -> bool {
185 if let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
186 let key_table_id = key.user_key.table_id;
187 let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
188 match (&key_table_id, &key_vnode).cmp(&(table_id, vnode)) {
189 Ordering::Less => {
190 return false;
191 }
192 Ordering::Equal => {
193 return direction.key_filter_by_watermark(inner_key, watermark);
194 }
195 Ordering::Greater => {
196 return self.advance_watermark(key, value);
199 }
200 }
201 }
202 false
203 }
204
205 fn reset_watermark(&mut self) {
206 self.remain_watermarks = self
207 .watermarks
208 .iter()
209 .flat_map(|(table_id, read_watermarks)| {
210 read_watermarks
211 .vnode_watermarks
212 .iter()
213 .map(|(vnode, watermarks)| {
214 (
215 *table_id,
216 *vnode,
217 read_watermarks.direction,
218 watermarks.clone(),
219 )
220 })
221 })
222 .collect();
223 }
224
225 fn advance_watermark(&mut self, key: &FullKey<&[u8]>, _value: HummockValue<&[u8]>) -> bool {
230 let key_table_id = key.user_key.table_id;
231 let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
232 while let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
233 match (table_id, vnode).cmp(&(&key_table_id, &key_vnode)) {
234 Ordering::Less => {
235 self.remain_watermarks.pop_front();
236 continue;
237 }
238 Ordering::Equal => {
239 match direction {
240 WatermarkDirection::Ascending => {
241 match inner_key.cmp(watermark.as_ref()) {
242 Ordering::Less => {
243 return true;
246 }
247 Ordering::Equal | Ordering::Greater => {
248 self.remain_watermarks.pop_front();
251 #[cfg(debug_assertions)]
256 {
257 if let Some((next_table_id, next_vnode, _, _)) =
258 self.remain_watermarks.front()
259 {
260 assert!(
261 (next_table_id, next_vnode)
262 > (&key_table_id, &key_vnode)
263 );
264 }
265 }
266 return false;
267 }
268 }
269 }
270 WatermarkDirection::Descending => {
271 return match inner_key.cmp(watermark.as_ref()) {
272 Ordering::Less | Ordering::Equal => false,
274 Ordering::Greater => true,
277 };
278 }
279 }
280 }
281 Ordering::Greater => {
282 return false;
283 }
284 }
285 }
286 false
287 }
288}
289
290impl PkPrefixSkipWatermarkState {
291 pub fn new(watermarks: BTreeMap<TableId, ReadTableWatermark>) -> Self {
292 Self {
293 remain_watermarks: VecDeque::new(),
294 watermarks,
295 }
296 }
297
298 pub fn from_safe_epoch_watermarks(
299 safe_epoch_watermarks: BTreeMap<TableId, TableWatermarks>,
300 ) -> Self {
301 let watermarks = safe_epoch_read_table_watermarks_impl(safe_epoch_watermarks);
302 Self::new(watermarks)
303 }
304}
305
306pub struct NonPkPrefixSkipWatermarkState {
307 watermarks: BTreeMap<TableId, ReadTableWatermark>,
308 remain_watermarks: VecDeque<(TableId, VirtualNode, WatermarkDirection, Datum)>,
309 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
310
311 last_serde: Option<(OrderedRowSerde, OrderedRowSerde, usize)>,
312 last_table_id: Option<TableId>,
313}
314
315impl NonPkPrefixSkipWatermarkState {
316 pub fn new(
317 watermarks: BTreeMap<TableId, ReadTableWatermark>,
318 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
319 ) -> Self {
320 Self {
321 remain_watermarks: VecDeque::new(),
322 watermarks,
323 compaction_catalog_agent_ref,
324 last_serde: None,
325 last_table_id: None,
326 }
327 }
328
329 pub fn from_safe_epoch_watermarks(
330 safe_epoch_watermarks: BTreeMap<TableId, TableWatermarks>,
331 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
332 ) -> Self {
333 let watermarks = safe_epoch_read_table_watermarks_impl(safe_epoch_watermarks);
334 Self::new(watermarks, compaction_catalog_agent_ref)
335 }
336}
337
338impl SkipWatermarkState for NonPkPrefixSkipWatermarkState {
339 #[inline(always)]
340 fn has_watermark(&self) -> bool {
341 !self.remain_watermarks.is_empty()
342 }
343
344 fn should_delete(&mut self, key: &FullKey<&[u8]>, value: HummockValue<&[u8]>) -> bool {
345 if let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
346 let key_table_id = key.user_key.table_id;
347 {
348 if self
349 .last_table_id
350 .is_none_or(|last_table_id| last_table_id != key_table_id)
351 {
352 self.last_table_id = Some(key_table_id);
353 }
354 }
355
356 let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
357 match (&key_table_id, &key_vnode).cmp(&(table_id, vnode)) {
358 Ordering::Less => {
359 return false;
360 }
361 Ordering::Equal => {
362 if self.last_serde.is_none() {
363 self.last_serde =
364 self.compaction_catalog_agent_ref.watermark_serde(*table_id);
365 }
366 let (pk_prefix_serde, watermark_col_serde, watermark_col_idx_in_pk) =
367 self.last_serde.as_ref().unwrap();
368 let row = pk_prefix_serde
369 .deserialize(inner_key)
370 .unwrap_or_else(|_| {
371 panic!("Failed to deserialize pk_prefix inner_key {:?} serde data_types {:?} order_types {:?}", inner_key, pk_prefix_serde.get_data_types(), pk_prefix_serde.get_order_types());
372 });
373 let watermark_col_in_pk = row.datum_at(*watermark_col_idx_in_pk);
374 return direction.datum_filter_by_watermark(
375 watermark_col_in_pk,
376 watermark,
377 watermark_col_serde.get_order_types()[0],
378 );
379 }
380 Ordering::Greater => {
381 return self.advance_watermark(key, value);
384 }
385 }
386 }
387 false
388 }
389
390 fn reset_watermark(&mut self) {
391 self.last_serde = None;
392 self.remain_watermarks = self
393 .watermarks
394 .iter()
395 .flat_map(|(table_id, read_watermarks)| {
396 let watermark_serde = self.compaction_catalog_agent_ref.watermark_serde(*table_id).map(|(_pk_serde, watermark_serde, _watermark_col_idx_in_pk)| watermark_serde);
397
398 read_watermarks
399 .vnode_watermarks
400 .iter()
401 .flat_map(move |(vnode, watermarks)| {
402 let watermark_serde = watermark_serde.as_ref()?;
404 Some((
405 *table_id,
406 *vnode,
407 read_watermarks.direction,
408 {
409 let row = watermark_serde
410 .deserialize(watermarks).unwrap_or_else(|_| {
411 panic!("Failed to deserialize watermark {:?} serde data_types {:?} order_types {:?}", watermarks, watermark_serde.get_data_types(), watermark_serde.get_order_types());
412 });
413 row[0].clone()
414 },
415 ))
416 })
417 })
418 .collect();
419 }
420
421 fn advance_watermark(&mut self, key: &FullKey<&[u8]>, _value: HummockValue<&[u8]>) -> bool {
422 let key_table_id = key.user_key.table_id;
423 let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
424 while let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
425 match (table_id, vnode).cmp(&(&key_table_id, &key_vnode)) {
426 Ordering::Less => {
427 self.remain_watermarks.pop_front();
428 self.last_serde = None;
429 continue;
430 }
431 Ordering::Equal => {
432 if self.last_serde.is_none() {
433 self.last_serde =
434 self.compaction_catalog_agent_ref.watermark_serde(*table_id);
435 }
436 let (pk_prefix_serde, watermark_col_serde, watermark_col_idx_in_pk) =
437 self.last_serde.as_ref().unwrap();
438
439 let row = pk_prefix_serde
440 .deserialize(inner_key)
441 .unwrap_or_else(|_| {
442 panic!("Failed to deserialize pk_prefix inner_key {:?} serde data_types {:?} order_types {:?}", inner_key, pk_prefix_serde.get_data_types(), pk_prefix_serde.get_order_types());
443 });
444 let watermark_col_in_pk = row.datum_at(*watermark_col_idx_in_pk);
445
446 return direction.datum_filter_by_watermark(
447 watermark_col_in_pk,
448 watermark,
449 watermark_col_serde.get_order_types()[0],
450 );
451 }
452 Ordering::Greater => {
453 return false;
454 }
455 }
456 }
457 false
458 }
459}
460
461pub type PkPrefixSkipWatermarkIterator<I> = SkipWatermarkIterator<I, PkPrefixSkipWatermarkState>;
462
463pub type NonPkPrefixSkipWatermarkIterator<I> =
464 SkipWatermarkIterator<I, NonPkPrefixSkipWatermarkState>;
465
466pub type ValueSkipWatermarkIterator<I> = SkipWatermarkIterator<I, ValueSkipWatermarkState>;
467
468pub struct ValueSkipWatermarkState {
469 pub watermarks: BTreeMap<TableId, ReadTableWatermark>,
470 remain_watermarks: VecDeque<(TableId, VirtualNode, WatermarkDirection, Bytes)>,
471 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
472 last_serde: Option<ValueWatermarkColumnSerdeRef>,
473 last_table_id: Option<TableId>,
474}
475
476impl ValueSkipWatermarkState {
477 pub fn new(
478 watermarks: BTreeMap<TableId, ReadTableWatermark>,
479 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
480 ) -> Self {
481 Self {
482 remain_watermarks: VecDeque::new(),
483 watermarks,
484 compaction_catalog_agent_ref,
485 last_serde: None,
486 last_table_id: None,
487 }
488 }
489
490 pub fn from_safe_epoch_watermarks(
491 safe_epoch_watermarks: BTreeMap<TableId, TableWatermarks>,
492 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
493 ) -> Self {
494 let watermarks = safe_epoch_read_table_watermarks_impl(safe_epoch_watermarks);
495 Self::new(watermarks, compaction_catalog_agent_ref)
496 }
497
498 pub fn may_delete(&self, key: &FullKey<&[u8]>) -> bool {
499 let table_id = key.user_key.table_id;
500 self.watermarks.contains_key(&table_id)
501 }
502}
503
504impl SkipWatermarkState for ValueSkipWatermarkState {
505 #[inline(always)]
506 fn has_watermark(&self) -> bool {
507 !self.remain_watermarks.is_empty()
508 }
509
510 fn should_delete(&mut self, key: &FullKey<&[u8]>, value: HummockValue<&[u8]>) -> bool {
511 if let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
512 let key_table_id = key.user_key.table_id;
513 let key_vnode = key.user_key.table_key.vnode_part();
514 if self
515 .last_table_id
516 .is_none_or(|last_table_id| last_table_id != key_table_id)
517 {
518 self.last_table_id = Some(key_table_id);
519 }
520 match (&key_table_id, &key_vnode).cmp(&(table_id, vnode)) {
521 Ordering::Less => {
522 return false;
523 }
524 Ordering::Equal => {
525 let HummockValue::Put(value) = value else {
526 return false;
527 };
528 if self.last_serde.is_none() {
529 self.last_serde = self
530 .compaction_catalog_agent_ref
531 .value_watermark_serde(*table_id);
532 }
533 let last_serde = self.last_serde.as_ref().unwrap();
534 let Ok(watermark_column_value) = last_serde.deserialize(value) else {
535 tracing::error!(
536 ?table_id,
537 ?vnode,
538 ?value,
539 "Failed to deserialize watermark column"
540 );
541 return false;
542 };
543 let Some(watermark_column_value) = watermark_column_value else {
544 tracing::debug!(
545 ?table_id,
546 ?vnode,
547 ?value,
548 "The specified watermark column was not found in the value, likely due to the use of a non-current catalog. Restarting the compactor should resolve this issue."
549 );
550 return false;
551 };
552 return direction.key_filter_by_watermark(&watermark_column_value, watermark);
553 }
554 Ordering::Greater => {
555 return self.advance_watermark(key, value);
558 }
559 }
560 }
561 false
562 }
563
564 fn reset_watermark(&mut self) {
565 self.last_serde = None;
566 self.remain_watermarks = self
567 .watermarks
568 .iter()
569 .flat_map(|(table_id, read_watermarks)| {
570 read_watermarks
571 .vnode_watermarks
572 .iter()
573 .map(|(vnode, watermarks)| {
574 (
575 *table_id,
576 *vnode,
577 read_watermarks.direction,
578 watermarks.clone(),
579 )
580 })
581 })
582 .collect();
583 }
584
585 fn advance_watermark(&mut self, key: &FullKey<&[u8]>, value: HummockValue<&[u8]>) -> bool {
586 let key_table_id = key.user_key.table_id;
587 let key_vnode = key.user_key.table_key.vnode_part();
588 while let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
589 match (table_id, vnode).cmp(&(&key_table_id, &key_vnode)) {
590 Ordering::Less => {
591 self.remain_watermarks.pop_front();
592 self.last_serde = None;
593 continue;
594 }
595 Ordering::Equal => {
596 let HummockValue::Put(value) = value else {
597 return false;
598 };
599 if self.last_serde.is_none() {
600 self.last_serde = self
601 .compaction_catalog_agent_ref
602 .value_watermark_serde(*table_id);
603 }
604 let last_serde = self.last_serde.as_ref().unwrap();
605 let Ok(watermark_column_value) = last_serde.deserialize(value) else {
606 tracing::error!(
607 ?table_id,
608 ?vnode,
609 ?value,
610 "Failed to deserialize watermark column."
611 );
612 return false;
613 };
614 let Some(watermark_column_value) = watermark_column_value else {
615 tracing::warn!(
616 ?table_id,
617 ?vnode,
618 ?value,
619 "The specified watermark column was not found in the value, likely due to the use of a non-current catalog. Restarting the compactor should resolve this issue."
620 );
621 return false;
622 };
623 return direction.key_filter_by_watermark(&watermark_column_value, watermark);
624 }
625 Ordering::Greater => {
626 return false;
627 }
628 }
629 }
630 false
631 }
632}
633
634#[cfg(test)]
635mod tests {
636 use std::collections::{BTreeMap, HashMap};
637 use std::iter::{empty, once};
638 use std::sync::Arc;
639
640 use bytes::Bytes;
641 use itertools::Itertools;
642 use risingwave_common::catalog::TableId;
643 use risingwave_common::hash::VirtualNode;
644 use risingwave_common::row::{OwnedRow, RowExt};
645 use risingwave_common::types::{DataType, ScalarImpl};
646 use risingwave_common::util::epoch::test_epoch;
647 use risingwave_common::util::row_serde::OrderedRowSerde;
648 use risingwave_common::util::sort_util::OrderType;
649 use risingwave_hummock_sdk::EpochWithGap;
650 use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey, gen_key_from_str};
651 use risingwave_hummock_sdk::table_watermark::{ReadTableWatermark, WatermarkDirection};
652
653 use super::PkPrefixSkipWatermarkState;
654 use crate::compaction_catalog_manager::{
655 CompactionCatalogAgent, FilterKeyExtractorImpl, FullKeyFilterKeyExtractor,
656 };
657 use crate::hummock::iterator::{
658 HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
659 NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator,
660 };
661 use crate::hummock::shared_buffer::shared_buffer_batch::{
662 SharedBufferBatch, SharedBufferValue,
663 };
664 use crate::row_serde::row_serde_util::{serialize_pk, serialize_pk_with_vnode};
665
666 const EPOCH: u64 = test_epoch(1);
667 const TABLE_ID: TableId = TableId::new(233);
668
669 async fn assert_iter_eq(
670 mut first: Option<impl HummockIterator>,
671 mut second: impl HummockIterator,
672 seek_key: Option<(usize, usize)>,
673 ) {
674 if let Some((vnode, key_index)) = seek_key {
675 let (seek_key, _) = gen_key_value(vnode, key_index);
676 let full_key = FullKey {
677 user_key: UserKey {
678 table_id: TABLE_ID,
679 table_key: seek_key,
680 },
681 epoch_with_gap: EpochWithGap::new_from_epoch(EPOCH),
682 };
683 if let Some(first) = &mut first {
684 first.seek(full_key.to_ref()).await.unwrap();
685 }
686 second.seek(full_key.to_ref()).await.unwrap()
687 } else {
688 if let Some(first) = &mut first {
689 first.rewind().await.unwrap();
690 }
691 second.rewind().await.unwrap();
692 }
693
694 if let Some(first) = &mut first {
695 while first.is_valid() {
696 assert!(second.is_valid());
697 let first_key = first.key();
698 let second_key = second.key();
699 assert_eq!(first_key, second_key);
700 assert_eq!(first.value(), second.value());
701 first.next().await.unwrap();
702 second.next().await.unwrap();
703 }
704 }
705 assert!(!second.is_valid());
706 }
707
708 fn build_batch(
709 pairs: impl Iterator<Item = (TableKey<Bytes>, SharedBufferValue<Bytes>)>,
710 table_id: TableId,
711 ) -> Option<SharedBufferBatch> {
712 let pairs: Vec<_> = pairs.collect();
713 if pairs.is_empty() {
714 None
715 } else {
716 Some(SharedBufferBatch::for_test(pairs, EPOCH, table_id))
717 }
718 }
719
720 fn filter_with_watermarks(
721 iter: impl Iterator<Item = (TableKey<Bytes>, SharedBufferValue<Bytes>)>,
722 table_watermarks: ReadTableWatermark,
723 ) -> impl Iterator<Item = (TableKey<Bytes>, SharedBufferValue<Bytes>)> {
724 iter.filter(move |(key, _)| {
725 if let Some(watermark) = table_watermarks.vnode_watermarks.get(&key.vnode_part()) {
726 !table_watermarks
727 .direction
728 .key_filter_by_watermark(key.key_part(), watermark)
729 } else {
730 true
731 }
732 })
733 }
734
735 fn gen_inner_key(index: usize) -> String {
736 format!("key{:5}", index)
737 }
738
739 async fn test_watermark(
740 watermarks: impl IntoIterator<Item = (usize, usize)>,
741 direction: WatermarkDirection,
742 ) {
743 let test_index: [(usize, usize); 7] =
744 [(0, 2), (0, 3), (0, 4), (1, 1), (1, 3), (4, 2), (8, 1)];
745 let items = test_index
746 .iter()
747 .map(|(vnode, key_index)| gen_key_value(*vnode, *key_index))
748 .collect_vec();
749
750 let read_watermark = ReadTableWatermark {
751 direction,
752 vnode_watermarks: BTreeMap::from_iter(watermarks.into_iter().map(
753 |(vnode, key_index)| {
754 (
755 VirtualNode::from_index(vnode),
756 Bytes::from(gen_inner_key(key_index)),
757 )
758 },
759 )),
760 };
761
762 let gen_iters = || {
763 let batch = build_batch(
764 filter_with_watermarks(items.clone().into_iter(), read_watermark.clone()),
765 TABLE_ID,
766 );
767
768 let iter = PkPrefixSkipWatermarkIterator::new(
769 build_batch(items.clone().into_iter(), TABLE_ID)
770 .unwrap()
771 .into_forward_iter(),
772 PkPrefixSkipWatermarkState::new(BTreeMap::from_iter(once((
773 TABLE_ID,
774 read_watermark.clone(),
775 )))),
776 );
777 (batch.map(|batch| batch.into_forward_iter()), iter)
778 };
779 let (first, second) = gen_iters();
780 assert_iter_eq(first, second, None).await;
781 for (vnode, key_index) in &test_index {
782 let (first, second) = gen_iters();
783 assert_iter_eq(first, second, Some((*vnode, *key_index))).await;
784 }
785 let (last_vnode, last_key_index) = test_index.last().unwrap();
786 let (first, second) = gen_iters();
787 assert_iter_eq(first, second, Some((*last_vnode, last_key_index + 1))).await;
788 }
789
790 fn gen_key_value(vnode: usize, index: usize) -> (TableKey<Bytes>, SharedBufferValue<Bytes>) {
791 (
792 gen_key_from_str(VirtualNode::from_index(vnode), &gen_inner_key(index)),
793 SharedBufferValue::Insert(Bytes::copy_from_slice(
794 format!("{}-value-{}", vnode, index).as_bytes(),
795 )),
796 )
797 }
798
799 #[tokio::test]
800 async fn test_no_watermark() {
801 test_watermark(empty(), WatermarkDirection::Ascending).await;
802 test_watermark(empty(), WatermarkDirection::Descending).await;
803 }
804
805 #[tokio::test]
806 async fn test_too_low_watermark() {
807 test_watermark(vec![(0, 0)], WatermarkDirection::Ascending).await;
808 test_watermark(vec![(0, 10)], WatermarkDirection::Descending).await;
809 }
810
811 #[tokio::test]
812 async fn test_single_watermark() {
813 test_watermark(vec![(0, 3)], WatermarkDirection::Ascending).await;
814 test_watermark(vec![(0, 3)], WatermarkDirection::Descending).await;
815 }
816
817 #[tokio::test]
818 async fn test_watermark_vnode_no_data() {
819 test_watermark(vec![(3, 3)], WatermarkDirection::Ascending).await;
820 test_watermark(vec![(3, 3)], WatermarkDirection::Descending).await;
821 }
822
823 #[tokio::test]
824 async fn test_filter_all() {
825 test_watermark(
826 vec![(0, 5), (1, 4), (2, 0), (4, 3), (8, 2)],
827 WatermarkDirection::Ascending,
828 )
829 .await;
830 test_watermark(
831 vec![(0, 0), (1, 0), (2, 0), (4, 0), (8, 0)],
832 WatermarkDirection::Descending,
833 )
834 .await;
835 }
836
837 #[tokio::test]
838 async fn test_advance_multi_vnode() {
839 test_watermark(vec![(1, 2), (8, 0)], WatermarkDirection::Ascending).await;
840 }
841
842 #[tokio::test]
843 async fn test_non_pk_prefix_watermark() {
844 fn gen_key_value(
845 vnode: usize,
846 col_0: i32,
847 col_1: i32,
848 col_2: i32,
849 col_3: i32,
850 pk_serde: &OrderedRowSerde,
851 pk_indices: &[usize],
852 ) -> (TableKey<Bytes>, SharedBufferValue<Bytes>) {
853 let r = OwnedRow::new(vec![
854 Some(ScalarImpl::Int32(col_0)),
855 Some(ScalarImpl::Int32(col_1)),
856 Some(ScalarImpl::Int32(col_2)), Some(ScalarImpl::Int32(col_3)),
858 ]);
859
860 let pk = r.project(pk_indices);
861
862 let k1 = serialize_pk_with_vnode(pk, pk_serde, VirtualNode::from_index(vnode));
863 let v1 = SharedBufferValue::Insert(Bytes::copy_from_slice(
864 format!("{}-value-{}", vnode, col_2).as_bytes(),
865 ));
866 (k1, v1)
867 }
868
869 let watermark_direction = WatermarkDirection::Ascending;
870
871 let watermark_col_serde =
872 OrderedRowSerde::new(vec![DataType::Int32], vec![OrderType::ascending()]);
873 let pk_serde = OrderedRowSerde::new(
874 vec![DataType::Int32, DataType::Int32, DataType::Int32],
875 vec![
876 OrderType::ascending(),
877 OrderType::ascending(),
878 OrderType::ascending(),
879 ],
880 );
881
882 let pk_indices = vec![0, 2, 3];
883 let watermark_col_idx_in_pk = 1;
884
885 {
886 let shared_buffer_batch = {
888 let mut kv_pairs = (0..10)
889 .map(|i| gen_key_value(0, i, 0, i, i, &pk_serde, &pk_indices))
890 .collect_vec();
891 kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
892 build_batch(kv_pairs.into_iter(), TABLE_ID)
893 }
894 .unwrap();
895
896 {
897 let read_watermark = ReadTableWatermark {
899 direction: watermark_direction,
900 vnode_watermarks: BTreeMap::default(),
901 };
902
903 let compaction_catalog_agent_ref = CompactionCatalogAgent::for_test(vec![TABLE_ID]);
904
905 let mut iter = NonPkPrefixSkipWatermarkIterator::new(
906 shared_buffer_batch.clone().into_forward_iter(),
907 NonPkPrefixSkipWatermarkState::new(
908 BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
909 compaction_catalog_agent_ref,
910 ),
911 );
912
913 iter.rewind().await.unwrap();
914 assert!(iter.is_valid());
915 for i in 0..10 {
916 let (k, _v) = gen_key_value(0, i, 0, i, i, &pk_serde, &pk_indices);
917 assert_eq!(iter.key().user_key.table_key.as_ref(), k.as_ref());
918 iter.next().await.unwrap();
919 }
920 assert!(!iter.is_valid());
921 }
922
923 {
924 let watermark = {
926 let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
927 serialize_pk(r1, &watermark_col_serde)
928 };
929
930 let read_watermark = ReadTableWatermark {
931 direction: watermark_direction,
932 vnode_watermarks: BTreeMap::from_iter(once((
933 VirtualNode::from_index(0),
934 watermark.clone(),
935 ))),
936 };
937
938 let full_key_filter_key_extractor =
939 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
940
941 let table_id_to_vnode =
942 HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
943
944 let table_id_to_watermark_serde = HashMap::from_iter(once((
945 TABLE_ID,
946 Some((
947 pk_serde.clone(),
948 watermark_col_serde.clone(),
949 watermark_col_idx_in_pk,
950 )),
951 )));
952
953 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
954 full_key_filter_key_extractor,
955 table_id_to_vnode,
956 table_id_to_watermark_serde,
957 HashMap::default(),
958 ));
959
960 let mut iter = NonPkPrefixSkipWatermarkIterator::new(
961 shared_buffer_batch.clone().into_forward_iter(),
962 NonPkPrefixSkipWatermarkState::new(
963 BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
964 compaction_catalog_agent_ref,
965 ),
966 );
967
968 iter.rewind().await.unwrap();
969 assert!(iter.is_valid());
970 for i in 5..10 {
971 let (k, _v) = gen_key_value(0, i, 0, i, i, &pk_serde, &pk_indices);
972 assert_eq!(iter.key().user_key.table_key.as_ref(), k.as_ref());
973 iter.next().await.unwrap();
974 }
975 assert!(!iter.is_valid());
976 }
977 }
978
979 {
980 let shared_buffer_batch = {
982 let mut kv_pairs = (0..10_i32)
983 .map(|i| gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices))
984 .collect_vec();
985
986 kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
987 build_batch(kv_pairs.into_iter(), TABLE_ID)
988 };
989
990 {
991 let watermark = {
993 let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
994 serialize_pk(r1, &watermark_col_serde)
995 };
996
997 let read_watermark = ReadTableWatermark {
998 direction: watermark_direction,
999 vnode_watermarks: BTreeMap::from_iter(
1000 (0..2).map(|i| (VirtualNode::from_index(i), watermark.clone())),
1001 ),
1002 };
1003
1004 let full_key_filter_key_extractor =
1005 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
1006
1007 let table_id_to_vnode =
1008 HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
1009
1010 let table_id_to_watermark_serde = HashMap::from_iter(once((
1011 TABLE_ID,
1012 Some((
1013 pk_serde.clone(),
1014 watermark_col_serde.clone(),
1015 watermark_col_idx_in_pk,
1016 )),
1017 )));
1018
1019 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1020 full_key_filter_key_extractor,
1021 table_id_to_vnode,
1022 table_id_to_watermark_serde,
1023 HashMap::default(),
1024 ));
1025
1026 let mut iter = NonPkPrefixSkipWatermarkIterator::new(
1027 shared_buffer_batch.clone().unwrap().into_forward_iter(),
1028 NonPkPrefixSkipWatermarkState::new(
1029 BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
1030 compaction_catalog_agent_ref,
1031 ),
1032 );
1033
1034 iter.rewind().await.unwrap();
1035 assert!(iter.is_valid());
1036 let mut kv_pairs = (5..10_i32)
1037 .map(|i| {
1038 let (k, v) =
1039 gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices);
1040 (k, v)
1041 })
1042 .collect_vec();
1043 kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1044 let mut index = 0;
1045 while iter.is_valid() {
1046 assert!(kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
1047 iter.next().await.unwrap();
1048 index += 1;
1049 }
1050 }
1051
1052 {
1053 let watermark = {
1055 let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
1056 serialize_pk(r1, &watermark_col_serde)
1057 };
1058
1059 let read_watermark = ReadTableWatermark {
1060 direction: watermark_direction,
1061 vnode_watermarks: BTreeMap::from_iter(
1062 (0..2).map(|i| (VirtualNode::from_index(i), watermark.clone())),
1063 ),
1064 };
1065
1066 let full_key_filter_key_extractor =
1067 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
1068
1069 let table_id_to_vnode =
1070 HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
1071
1072 let table_id_to_watermark_serde = HashMap::from_iter(once((
1073 TABLE_ID,
1074 Some((
1075 pk_serde.clone(),
1076 watermark_col_serde.clone(),
1077 watermark_col_idx_in_pk,
1078 )),
1079 )));
1080
1081 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1082 full_key_filter_key_extractor,
1083 table_id_to_vnode,
1084 table_id_to_watermark_serde,
1085 HashMap::default(),
1086 ));
1087
1088 let mut iter = NonPkPrefixSkipWatermarkIterator::new(
1089 shared_buffer_batch.clone().unwrap().into_forward_iter(),
1090 NonPkPrefixSkipWatermarkState::new(
1091 BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
1092 compaction_catalog_agent_ref,
1093 ),
1094 );
1095
1096 iter.rewind().await.unwrap();
1097 assert!(iter.is_valid());
1098 let mut kv_pairs = (5..10_i32)
1099 .map(|i| {
1100 let (k, v) =
1101 gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices);
1102 (k, v)
1103 })
1104 .collect_vec();
1105 kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1106 let mut index = 0;
1107 while iter.is_valid() {
1108 assert!(kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
1109 iter.next().await.unwrap();
1110 index += 1;
1111 }
1112 }
1113
1114 {
1115 let watermark = {
1117 let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
1118 serialize_pk(r1, &watermark_col_serde)
1119 };
1120
1121 let read_watermark = ReadTableWatermark {
1122 direction: WatermarkDirection::Descending,
1123 vnode_watermarks: BTreeMap::from_iter(
1124 (0..2).map(|i| (VirtualNode::from_index(i), watermark.clone())),
1125 ),
1126 };
1127
1128 let full_key_filter_key_extractor =
1129 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
1130
1131 let table_id_to_vnode =
1132 HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
1133
1134 let table_id_to_watermark_serde = HashMap::from_iter(once((
1135 TABLE_ID,
1136 Some((
1137 pk_serde.clone(),
1138 watermark_col_serde.clone(),
1139 watermark_col_idx_in_pk,
1140 )),
1141 )));
1142
1143 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1144 full_key_filter_key_extractor,
1145 table_id_to_vnode,
1146 table_id_to_watermark_serde,
1147 HashMap::default(),
1148 ));
1149
1150 let mut iter = NonPkPrefixSkipWatermarkIterator::new(
1151 shared_buffer_batch.clone().unwrap().into_forward_iter(),
1152 NonPkPrefixSkipWatermarkState::new(
1153 BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
1154 compaction_catalog_agent_ref,
1155 ),
1156 );
1157
1158 iter.rewind().await.unwrap();
1159 assert!(iter.is_valid());
1160 let mut kv_pairs = (0..=5_i32)
1161 .map(|i| {
1162 let (k, v) =
1163 gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices);
1164 (k, v)
1165 })
1166 .collect_vec();
1167 kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1168 let mut index = 0;
1169 while iter.is_valid() {
1170 assert!(kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
1171 iter.next().await.unwrap();
1172 index += 1;
1173 }
1174 }
1175 }
1176 }
1177
1178 #[tokio::test]
1179 async fn test_mix_watermark() {
1180 fn gen_key_value(
1181 vnode: usize,
1182 col_0: i32,
1183 col_1: i32,
1184 col_2: i32,
1185 col_3: i32,
1186 pk_serde: &OrderedRowSerde,
1187 pk_indices: &[usize],
1188 ) -> (TableKey<Bytes>, SharedBufferValue<Bytes>) {
1189 let r = OwnedRow::new(vec![
1190 Some(ScalarImpl::Int32(col_0)),
1191 Some(ScalarImpl::Int32(col_1)),
1192 Some(ScalarImpl::Int32(col_2)), Some(ScalarImpl::Int32(col_3)),
1194 ]);
1195
1196 let pk = r.project(pk_indices);
1197
1198 let k1 = serialize_pk_with_vnode(pk, pk_serde, VirtualNode::from_index(vnode));
1199 let v1 = SharedBufferValue::Insert(Bytes::copy_from_slice(
1200 format!("{}-value-{}-{}-{}-{}", vnode, col_0, col_1, col_2, col_3).as_bytes(),
1201 ));
1202
1203 (k1, v1)
1204 }
1205
1206 let watermark_col_serde =
1207 OrderedRowSerde::new(vec![DataType::Int32], vec![OrderType::ascending()]);
1208 let t1_pk_serde = OrderedRowSerde::new(
1209 vec![DataType::Int32, DataType::Int32, DataType::Int32],
1210 vec![
1211 OrderType::ascending(),
1212 OrderType::ascending(),
1213 OrderType::ascending(),
1214 ],
1215 );
1216
1217 let t1_pk_indices = vec![0, 2, 3];
1218 let t1_watermark_col_idx_in_pk = 1;
1219
1220 let t2_pk_indices = vec![0, 1, 2];
1221
1222 let t2_pk_serde = OrderedRowSerde::new(
1223 vec![DataType::Int32, DataType::Int32, DataType::Int32],
1224 vec![
1225 OrderType::ascending(),
1226 OrderType::ascending(),
1227 OrderType::ascending(),
1228 ],
1229 );
1230
1231 let t1_id = TABLE_ID;
1232 let t2_id = TableId::from(t1_id.as_raw_id() + 1);
1233
1234 let t1_shared_buffer_batch = {
1235 let mut kv_pairs = (0..10_i32)
1236 .map(|i| {
1237 gen_key_value(
1238 i as usize % 2,
1239 10 - i,
1240 0,
1241 i,
1242 i,
1243 &t1_pk_serde,
1244 &t1_pk_indices,
1245 )
1246 })
1247 .collect_vec();
1248
1249 kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1250 build_batch(kv_pairs.into_iter(), t1_id).unwrap()
1251 };
1252
1253 let t2_shared_buffer_batch = {
1254 let mut kv_pairs = (0..10_i32)
1255 .map(|i| {
1256 gen_key_value(
1257 i as usize % 2,
1258 10 - i,
1259 0,
1260 0,
1261 0,
1262 &t2_pk_serde,
1263 &t2_pk_indices,
1264 )
1265 })
1266 .collect_vec();
1267
1268 kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1269 build_batch(kv_pairs.into_iter(), t2_id).unwrap()
1270 };
1271
1272 let t1_watermark = {
1273 let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
1274 serialize_pk(r1, &watermark_col_serde)
1275 };
1276
1277 let t1_read_watermark = ReadTableWatermark {
1278 direction: WatermarkDirection::Ascending,
1279 vnode_watermarks: BTreeMap::from_iter(
1280 (0..2).map(|i| (VirtualNode::from_index(i), t1_watermark.clone())),
1281 ),
1282 };
1283
1284 let t2_watermark = {
1285 let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
1286 serialize_pk(r1, &watermark_col_serde)
1287 };
1288
1289 let t2_read_watermark = ReadTableWatermark {
1290 direction: WatermarkDirection::Ascending,
1291 vnode_watermarks: BTreeMap::from_iter(
1292 (0..2).map(|i| (VirtualNode::from_index(i), t2_watermark.clone())),
1293 ),
1294 };
1295
1296 {
1297 let t1_iter = t1_shared_buffer_batch.clone().into_forward_iter();
1299 let t2_iter = t2_shared_buffer_batch.clone().into_forward_iter();
1300 let iter_vec = vec![t1_iter, t2_iter];
1301 let merge_iter = MergeIterator::new(iter_vec);
1302
1303 let full_key_filter_key_extractor =
1304 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
1305
1306 let table_id_to_vnode =
1307 HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
1308
1309 let table_id_to_watermark_serde = HashMap::from_iter(once((
1310 t1_id,
1311 Some((
1312 t1_pk_serde.clone(),
1313 watermark_col_serde.clone(),
1314 t1_watermark_col_idx_in_pk,
1315 )),
1316 )));
1317
1318 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1319 full_key_filter_key_extractor,
1320 table_id_to_vnode,
1321 table_id_to_watermark_serde,
1322 HashMap::default(),
1323 ));
1324
1325 let mut iter = NonPkPrefixSkipWatermarkIterator::new(
1326 merge_iter,
1327 NonPkPrefixSkipWatermarkState::new(
1328 BTreeMap::from_iter(once((TABLE_ID, t1_read_watermark.clone()))),
1329 compaction_catalog_agent_ref,
1330 ),
1331 );
1332
1333 iter.rewind().await.unwrap();
1334 assert!(iter.is_valid());
1335 let mut t1_kv_pairs = (5..10_i32)
1336 .map(|i| {
1337 let (k, v) = gen_key_value(
1338 i as usize % 2,
1339 10 - i,
1340 0,
1341 i,
1342 i,
1343 &t1_pk_serde,
1344 &t1_pk_indices,
1345 );
1346 (k, v)
1347 })
1348 .collect_vec();
1349
1350 t1_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1351
1352 let mut t2_kv_pairs = (0..10_i32)
1353 .map(|i| {
1354 gen_key_value(
1355 i as usize % 2,
1356 10 - i,
1357 0,
1358 0,
1359 0,
1360 &t2_pk_serde,
1361 &t2_pk_indices,
1362 )
1363 })
1364 .collect_vec();
1365
1366 t2_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1367 let mut index = 0;
1368 for _ in 0..t1_kv_pairs.len() {
1369 assert!(t1_kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
1370 iter.next().await.unwrap();
1371 index += 1;
1372 }
1373
1374 assert!(iter.is_valid());
1375 assert_eq!(t1_kv_pairs.len(), index);
1376
1377 index = 0;
1378 for _ in 0..t2_kv_pairs.len() {
1379 assert!(t2_kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
1380 iter.next().await.unwrap();
1381 index += 1;
1382 }
1383
1384 assert!(!iter.is_valid());
1385 assert_eq!(t2_kv_pairs.len(), index);
1386 }
1387
1388 {
1389 let t1_iter = t1_shared_buffer_batch.clone().into_forward_iter();
1390 let t2_iter = t2_shared_buffer_batch.clone().into_forward_iter();
1391 let iter_vec = vec![t1_iter, t2_iter];
1392 let merge_iter = MergeIterator::new(iter_vec);
1393
1394 let full_key_filter_key_extractor =
1395 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
1396
1397 let table_id_to_vnode = HashMap::from_iter(
1398 vec![
1399 (t1_id, VirtualNode::COUNT_FOR_TEST),
1400 (t2_id, VirtualNode::COUNT_FOR_TEST),
1401 ]
1402 .into_iter(),
1403 );
1404
1405 let table_id_to_watermark_serde = HashMap::from_iter(
1406 vec![
1407 (
1408 t1_id,
1409 Some((
1410 t1_pk_serde.clone(),
1411 watermark_col_serde.clone(),
1412 t1_watermark_col_idx_in_pk,
1413 )),
1414 ),
1415 (t2_id, None),
1416 ]
1417 .into_iter(),
1418 );
1419
1420 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1421 full_key_filter_key_extractor,
1422 table_id_to_vnode,
1423 table_id_to_watermark_serde,
1424 HashMap::default(),
1425 ));
1426
1427 let non_pk_prefix_iter = NonPkPrefixSkipWatermarkIterator::new(
1428 merge_iter,
1429 NonPkPrefixSkipWatermarkState::new(
1430 BTreeMap::from_iter(once((t1_id, t1_read_watermark.clone()))),
1431 compaction_catalog_agent_ref.clone(),
1432 ),
1433 );
1434
1435 let mut mix_iter = PkPrefixSkipWatermarkIterator::new(
1436 non_pk_prefix_iter,
1437 PkPrefixSkipWatermarkState::new(BTreeMap::from_iter(once((
1438 t2_id,
1439 t2_read_watermark.clone(),
1440 )))),
1441 );
1442
1443 mix_iter.rewind().await.unwrap();
1444 assert!(mix_iter.is_valid());
1445
1446 let mut t1_kv_pairs = (5..10_i32)
1447 .map(|i| {
1448 let (k, v) = gen_key_value(
1449 i as usize % 2,
1450 10 - i,
1451 0,
1452 i,
1453 i,
1454 &t1_pk_serde,
1455 &t1_pk_indices,
1456 );
1457 (k, v)
1458 })
1459 .collect_vec();
1460
1461 t1_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1462
1463 let mut t2_kv_pairs = (0..=5_i32)
1464 .map(|i| {
1465 gen_key_value(
1466 i as usize % 2,
1467 10 - i,
1468 0,
1469 0,
1470 0,
1471 &t2_pk_serde,
1472 &t2_pk_indices,
1473 )
1474 })
1475 .collect_vec();
1476
1477 t2_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1478
1479 let mut index = 0;
1480 for _ in 0..t1_kv_pairs.len() {
1481 assert!(
1482 t1_kv_pairs[index].0.as_ref() == mix_iter.key().user_key.table_key.as_ref()
1483 );
1484 mix_iter.next().await.unwrap();
1485 index += 1;
1486 }
1487
1488 assert!(mix_iter.is_valid());
1489 assert_eq!(t1_kv_pairs.len(), index);
1490
1491 index = 0;
1492
1493 for _ in 0..t2_kv_pairs.len() {
1494 assert!(
1495 t2_kv_pairs[index].0.as_ref() == mix_iter.key().user_key.table_key.as_ref()
1496 );
1497 mix_iter.next().await.unwrap();
1498 index += 1;
1499 }
1500
1501 assert!(!mix_iter.is_valid());
1502 assert_eq!(t2_kv_pairs.len(), index);
1503 }
1504 }
1505}