1use std::cmp::Ordering;
16use std::collections::{BTreeMap, VecDeque};
17
18use bytes::Bytes;
19use risingwave_common::catalog::TableId;
20use risingwave_common::hash::VirtualNode;
21use risingwave_common::row::Row;
22use risingwave_common::types::Datum;
23use risingwave_common::util::row_serde::OrderedRowSerde;
24use risingwave_hummock_sdk::compaction_group::hummock_version_ext::safe_epoch_read_table_watermarks_impl;
25use risingwave_hummock_sdk::key::FullKey;
26use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap, add_table_stats_map};
27use risingwave_hummock_sdk::table_watermark::{
28 ReadTableWatermark, TableWatermarks, WatermarkDirection,
29};
30
31use super::SkipWatermarkState;
32use crate::compaction_catalog_manager::CompactionCatalogAgentRef;
33use crate::hummock::HummockResult;
34use crate::hummock::iterator::{Forward, HummockIterator, ValueMeta};
35use crate::hummock::value::HummockValue;
36use crate::monitor::StoreLocalStatistic;
37
38pub struct SkipWatermarkIterator<I, S> {
39 inner: I,
40 state: S,
41 skipped_entry_table_stats: TableStatsMap,
43 last_table_id: Option<u32>,
45 last_table_stats: TableStats,
47}
48
49impl<I: HummockIterator<Direction = Forward>, S: SkipWatermarkState> SkipWatermarkIterator<I, S> {
50 pub fn new(inner: I, state: S) -> Self {
51 Self {
52 inner,
53 state,
54 skipped_entry_table_stats: TableStatsMap::default(),
55 last_table_id: None,
56 last_table_stats: TableStats::default(),
57 }
58 }
59
60 fn reset_watermark(&mut self) {
61 self.state.reset_watermark();
62 }
63
64 fn reset_skipped_entry_table_stats(&mut self) {
65 self.skipped_entry_table_stats = TableStatsMap::default();
66 self.last_table_id = None;
67 self.last_table_stats = TableStats::default();
68 }
69
70 async fn advance_key_and_watermark(&mut self) -> HummockResult<()> {
75 while self.inner.is_valid() {
78 if !self.state.should_delete(&self.inner.key()) {
79 break;
80 }
81
82 if self.last_table_id.is_none_or(|last_table_id| {
83 last_table_id != self.inner.key().user_key.table_id.table_id
84 }) {
85 self.add_last_table_stats();
86 self.last_table_id = Some(self.inner.key().user_key.table_id.table_id);
87 }
88 self.last_table_stats.total_key_count -= 1;
89 self.last_table_stats.total_key_size -= self.inner.key().encoded_len() as i64;
90 self.last_table_stats.total_value_size -= self.inner.value().encoded_len() as i64;
91
92 self.inner.next().await?;
93 }
94 self.add_last_table_stats();
95 Ok(())
96 }
97
98 fn add_last_table_stats(&mut self) {
99 let Some(last_table_id) = self.last_table_id.take() else {
100 return;
101 };
102 let delta = std::mem::take(&mut self.last_table_stats);
103 let e = self
104 .skipped_entry_table_stats
105 .entry(last_table_id)
106 .or_default();
107 e.total_key_count += delta.total_key_count;
108 e.total_key_size += delta.total_key_size;
109 e.total_value_size += delta.total_value_size;
110 }
111}
112
113impl<I: HummockIterator<Direction = Forward>, S: SkipWatermarkState> HummockIterator
114 for SkipWatermarkIterator<I, S>
115{
116 type Direction = Forward;
117
118 async fn next(&mut self) -> HummockResult<()> {
119 self.inner.next().await?;
120 if self.state.has_watermark() {
124 self.advance_key_and_watermark().await?;
125 }
126 Ok(())
127 }
128
129 fn key(&self) -> FullKey<&[u8]> {
130 self.inner.key()
131 }
132
133 fn value(&self) -> HummockValue<&[u8]> {
134 self.inner.value()
135 }
136
137 fn is_valid(&self) -> bool {
138 self.inner.is_valid()
139 }
140
141 async fn rewind(&mut self) -> HummockResult<()> {
142 self.reset_watermark();
143 self.reset_skipped_entry_table_stats();
144 self.inner.rewind().await?;
145 self.advance_key_and_watermark().await?;
146 Ok(())
147 }
148
149 async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
150 self.reset_watermark();
151 self.reset_skipped_entry_table_stats();
152 self.inner.seek(key).await?;
153 self.advance_key_and_watermark().await?;
154 Ok(())
155 }
156
157 fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
158 add_table_stats_map(
159 &mut stats.skipped_by_watermark_table_stats,
160 &self.skipped_entry_table_stats,
161 );
162 self.inner.collect_local_statistic(stats)
163 }
164
165 fn value_meta(&self) -> ValueMeta {
166 self.inner.value_meta()
167 }
168}
169pub struct PkPrefixSkipWatermarkState {
170 watermarks: BTreeMap<TableId, ReadTableWatermark>,
171 remain_watermarks: VecDeque<(TableId, VirtualNode, WatermarkDirection, Bytes)>,
172}
173
174impl SkipWatermarkState for PkPrefixSkipWatermarkState {
175 #[inline(always)]
176 fn has_watermark(&self) -> bool {
177 !self.remain_watermarks.is_empty()
178 }
179
180 fn should_delete(&mut self, key: &FullKey<&[u8]>) -> bool {
181 if let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
182 let key_table_id = key.user_key.table_id;
183 let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
184 match (&key_table_id, &key_vnode).cmp(&(table_id, vnode)) {
185 Ordering::Less => {
186 return false;
187 }
188 Ordering::Equal => {
189 return direction.key_filter_by_watermark(inner_key, watermark);
190 }
191 Ordering::Greater => {
192 return self.advance_watermark(key);
195 }
196 }
197 }
198 false
199 }
200
201 fn reset_watermark(&mut self) {
202 self.remain_watermarks = self
203 .watermarks
204 .iter()
205 .flat_map(|(table_id, read_watermarks)| {
206 read_watermarks
207 .vnode_watermarks
208 .iter()
209 .map(|(vnode, watermarks)| {
210 (
211 *table_id,
212 *vnode,
213 read_watermarks.direction,
214 watermarks.clone(),
215 )
216 })
217 })
218 .collect();
219 }
220
221 fn advance_watermark(&mut self, key: &FullKey<&[u8]>) -> bool {
226 let key_table_id = key.user_key.table_id;
227 let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
228 while let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
229 match (table_id, vnode).cmp(&(&key_table_id, &key_vnode)) {
230 Ordering::Less => {
231 self.remain_watermarks.pop_front();
232 continue;
233 }
234 Ordering::Equal => {
235 match direction {
236 WatermarkDirection::Ascending => {
237 match inner_key.cmp(watermark.as_ref()) {
238 Ordering::Less => {
239 return true;
242 }
243 Ordering::Equal | Ordering::Greater => {
244 self.remain_watermarks.pop_front();
247 #[cfg(debug_assertions)]
252 {
253 if let Some((next_table_id, next_vnode, _, _)) =
254 self.remain_watermarks.front()
255 {
256 assert!(
257 (next_table_id, next_vnode)
258 > (&key_table_id, &key_vnode)
259 );
260 }
261 }
262 return false;
263 }
264 }
265 }
266 WatermarkDirection::Descending => {
267 return match inner_key.cmp(watermark.as_ref()) {
268 Ordering::Less | Ordering::Equal => false,
270 Ordering::Greater => true,
273 };
274 }
275 }
276 }
277 Ordering::Greater => {
278 return false;
279 }
280 }
281 }
282 false
283 }
284}
285
286impl PkPrefixSkipWatermarkState {
287 pub fn new(watermarks: BTreeMap<TableId, ReadTableWatermark>) -> Self {
288 Self {
289 remain_watermarks: VecDeque::new(),
290 watermarks,
291 }
292 }
293
294 pub fn from_safe_epoch_watermarks(
295 safe_epoch_watermarks: BTreeMap<u32, TableWatermarks>,
296 ) -> Self {
297 let watermarks = safe_epoch_read_table_watermarks_impl(safe_epoch_watermarks);
298 Self::new(watermarks)
299 }
300}
301
302pub struct NonPkPrefixSkipWatermarkState {
303 watermarks: BTreeMap<TableId, ReadTableWatermark>,
304 remain_watermarks: VecDeque<(TableId, VirtualNode, WatermarkDirection, Datum)>,
305 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
306
307 last_serde: Option<(OrderedRowSerde, OrderedRowSerde, usize)>,
308 last_table_id: Option<u32>,
309}
310
311impl NonPkPrefixSkipWatermarkState {
312 pub fn new(
313 watermarks: BTreeMap<TableId, ReadTableWatermark>,
314 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
315 ) -> Self {
316 Self {
317 remain_watermarks: VecDeque::new(),
318 watermarks,
319 compaction_catalog_agent_ref,
320 last_serde: None,
321 last_table_id: None,
322 }
323 }
324
325 pub fn from_safe_epoch_watermarks(
326 safe_epoch_watermarks: BTreeMap<u32, TableWatermarks>,
327 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
328 ) -> Self {
329 let watermarks = safe_epoch_read_table_watermarks_impl(safe_epoch_watermarks);
330 Self::new(watermarks, compaction_catalog_agent_ref)
331 }
332}
333
334impl SkipWatermarkState for NonPkPrefixSkipWatermarkState {
335 #[inline(always)]
336 fn has_watermark(&self) -> bool {
337 !self.remain_watermarks.is_empty()
338 }
339
340 fn should_delete(&mut self, key: &FullKey<&[u8]>) -> bool {
341 if let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
342 let key_table_id = key.user_key.table_id;
343 {
344 if self
345 .last_table_id
346 .is_none_or(|last_table_id| last_table_id != key_table_id.table_id())
347 {
348 self.last_table_id = Some(key_table_id.table_id());
349 self.last_serde = self
350 .compaction_catalog_agent_ref
351 .watermark_serde(table_id.table_id());
352 }
353 }
354
355 let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
356 match (&key_table_id, &key_vnode).cmp(&(table_id, vnode)) {
357 Ordering::Less => {
358 return false;
359 }
360 Ordering::Equal => {
361 let (pk_prefix_serde, watermark_col_serde, watermark_col_idx_in_pk) =
362 self.last_serde.as_ref().unwrap();
363 let row = pk_prefix_serde
364 .deserialize(inner_key)
365 .unwrap_or_else(|_| {
366 panic!("Failed to deserialize pk_prefix inner_key {:?} serde data_types {:?} order_types {:?}", inner_key, pk_prefix_serde.get_data_types(), pk_prefix_serde.get_order_types());
367 });
368 let watermark_col_in_pk = row.datum_at(*watermark_col_idx_in_pk);
369 return direction.datum_filter_by_watermark(
370 watermark_col_in_pk,
371 watermark,
372 watermark_col_serde.get_order_types()[0],
373 );
374 }
375 Ordering::Greater => {
376 return self.advance_watermark(key);
379 }
380 }
381 }
382 false
383 }
384
385 fn reset_watermark(&mut self) {
386 self.remain_watermarks = self
387 .watermarks
388 .iter()
389 .flat_map(|(table_id, read_watermarks)| {
390 let watermark_serde = self.compaction_catalog_agent_ref.watermark_serde(table_id.table_id()).map(|(_pk_serde, watermark_serde, _watermark_col_idx_in_pk)| watermark_serde);
391
392 read_watermarks
393 .vnode_watermarks
394 .iter()
395 .map(move |(vnode, watermarks)| {
396 (
397 *table_id,
398 *vnode,
399 read_watermarks.direction,
400 {
401 let watermark_serde = watermark_serde.as_ref().unwrap();
402 let row = watermark_serde
403 .deserialize(watermarks).unwrap_or_else(|_| {
404 panic!("Failed to deserialize watermark {:?} serde data_types {:?} order_types {:?}", watermarks, watermark_serde.get_data_types(), watermark_serde.get_order_types());
405 });
406 row[0].clone()
407 },
408 )
409 })
410 })
411 .collect();
412 }
413
414 fn advance_watermark(&mut self, key: &FullKey<&[u8]>) -> bool {
415 let key_table_id = key.user_key.table_id;
416 let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
417 while let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
418 match (table_id, vnode).cmp(&(&key_table_id, &key_vnode)) {
419 Ordering::Less => {
420 self.remain_watermarks.pop_front();
421 continue;
422 }
423 Ordering::Equal => {
424 let (pk_prefix_serde, watermark_col_serde, watermark_col_idx_in_pk) =
425 self.last_serde.as_ref().unwrap();
426
427 let row = pk_prefix_serde
428 .deserialize(inner_key)
429 .unwrap_or_else(|_| {
430 panic!("Failed to deserialize pk_prefix inner_key {:?} serde data_types {:?} order_types {:?}", inner_key, pk_prefix_serde.get_data_types(), pk_prefix_serde.get_order_types());
431 });
432 let watermark_col_in_pk = row.datum_at(*watermark_col_idx_in_pk);
433
434 return direction.datum_filter_by_watermark(
435 watermark_col_in_pk,
436 watermark,
437 watermark_col_serde.get_order_types()[0],
438 );
439 }
440 Ordering::Greater => {
441 return false;
442 }
443 }
444 }
445 false
446 }
447}
448
449pub type PkPrefixSkipWatermarkIterator<I> = SkipWatermarkIterator<I, PkPrefixSkipWatermarkState>;
450
451pub type NonPkPrefixSkipWatermarkIterator<I> =
452 SkipWatermarkIterator<I, NonPkPrefixSkipWatermarkState>;
453
454#[cfg(test)]
455mod tests {
456 use std::collections::{BTreeMap, HashMap};
457 use std::iter::{empty, once};
458 use std::sync::Arc;
459
460 use bytes::Bytes;
461 use itertools::Itertools;
462 use risingwave_common::catalog::TableId;
463 use risingwave_common::hash::VirtualNode;
464 use risingwave_common::row::{OwnedRow, RowExt};
465 use risingwave_common::types::{DataType, ScalarImpl};
466 use risingwave_common::util::epoch::test_epoch;
467 use risingwave_common::util::row_serde::OrderedRowSerde;
468 use risingwave_common::util::sort_util::OrderType;
469 use risingwave_hummock_sdk::EpochWithGap;
470 use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey, gen_key_from_str};
471 use risingwave_hummock_sdk::table_watermark::{ReadTableWatermark, WatermarkDirection};
472
473 use super::PkPrefixSkipWatermarkState;
474 use crate::compaction_catalog_manager::{
475 CompactionCatalogAgent, FilterKeyExtractorImpl, FullKeyFilterKeyExtractor,
476 };
477 use crate::hummock::iterator::{
478 HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
479 NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator,
480 };
481 use crate::hummock::shared_buffer::shared_buffer_batch::{
482 SharedBufferBatch, SharedBufferValue,
483 };
484 use crate::row_serde::row_serde_util::{serialize_pk, serialize_pk_with_vnode};
485
486 const EPOCH: u64 = test_epoch(1);
487 const TABLE_ID: TableId = TableId::new(233);
488
489 async fn assert_iter_eq(
490 mut first: Option<impl HummockIterator>,
491 mut second: impl HummockIterator,
492 seek_key: Option<(usize, usize)>,
493 ) {
494 if let Some((vnode, key_index)) = seek_key {
495 let (seek_key, _) = gen_key_value(vnode, key_index);
496 let full_key = FullKey {
497 user_key: UserKey {
498 table_id: TABLE_ID,
499 table_key: seek_key,
500 },
501 epoch_with_gap: EpochWithGap::new_from_epoch(EPOCH),
502 };
503 if let Some(first) = &mut first {
504 first.seek(full_key.to_ref()).await.unwrap();
505 }
506 second.seek(full_key.to_ref()).await.unwrap()
507 } else {
508 if let Some(first) = &mut first {
509 first.rewind().await.unwrap();
510 }
511 second.rewind().await.unwrap();
512 }
513
514 if let Some(first) = &mut first {
515 while first.is_valid() {
516 assert!(second.is_valid());
517 let first_key = first.key();
518 let second_key = second.key();
519 assert_eq!(first_key, second_key);
520 assert_eq!(first.value(), second.value());
521 first.next().await.unwrap();
522 second.next().await.unwrap();
523 }
524 }
525 assert!(!second.is_valid());
526 }
527
528 fn build_batch(
529 pairs: impl Iterator<Item = (TableKey<Bytes>, SharedBufferValue<Bytes>)>,
530 table_id: TableId,
531 ) -> Option<SharedBufferBatch> {
532 let pairs: Vec<_> = pairs.collect();
533 if pairs.is_empty() {
534 None
535 } else {
536 Some(SharedBufferBatch::for_test(pairs, EPOCH, table_id))
537 }
538 }
539
540 fn filter_with_watermarks(
541 iter: impl Iterator<Item = (TableKey<Bytes>, SharedBufferValue<Bytes>)>,
542 table_watermarks: ReadTableWatermark,
543 ) -> impl Iterator<Item = (TableKey<Bytes>, SharedBufferValue<Bytes>)> {
544 iter.filter(move |(key, _)| {
545 if let Some(watermark) = table_watermarks.vnode_watermarks.get(&key.vnode_part()) {
546 !table_watermarks
547 .direction
548 .key_filter_by_watermark(key.key_part(), watermark)
549 } else {
550 true
551 }
552 })
553 }
554
555 fn gen_inner_key(index: usize) -> String {
556 format!("key{:5}", index)
557 }
558
559 async fn test_watermark(
560 watermarks: impl IntoIterator<Item = (usize, usize)>,
561 direction: WatermarkDirection,
562 ) {
563 let test_index: [(usize, usize); 7] =
564 [(0, 2), (0, 3), (0, 4), (1, 1), (1, 3), (4, 2), (8, 1)];
565 let items = test_index
566 .iter()
567 .map(|(vnode, key_index)| gen_key_value(*vnode, *key_index))
568 .collect_vec();
569
570 let read_watermark = ReadTableWatermark {
571 direction,
572 vnode_watermarks: BTreeMap::from_iter(watermarks.into_iter().map(
573 |(vnode, key_index)| {
574 (
575 VirtualNode::from_index(vnode),
576 Bytes::from(gen_inner_key(key_index)),
577 )
578 },
579 )),
580 };
581
582 let gen_iters = || {
583 let batch = build_batch(
584 filter_with_watermarks(items.clone().into_iter(), read_watermark.clone()),
585 TABLE_ID,
586 );
587
588 let iter = PkPrefixSkipWatermarkIterator::new(
589 build_batch(items.clone().into_iter(), TABLE_ID)
590 .unwrap()
591 .into_forward_iter(),
592 PkPrefixSkipWatermarkState::new(BTreeMap::from_iter(once((
593 TABLE_ID,
594 read_watermark.clone(),
595 )))),
596 );
597 (batch.map(|batch| batch.into_forward_iter()), iter)
598 };
599 let (first, second) = gen_iters();
600 assert_iter_eq(first, second, None).await;
601 for (vnode, key_index) in &test_index {
602 let (first, second) = gen_iters();
603 assert_iter_eq(first, second, Some((*vnode, *key_index))).await;
604 }
605 let (last_vnode, last_key_index) = test_index.last().unwrap();
606 let (first, second) = gen_iters();
607 assert_iter_eq(first, second, Some((*last_vnode, last_key_index + 1))).await;
608 }
609
610 fn gen_key_value(vnode: usize, index: usize) -> (TableKey<Bytes>, SharedBufferValue<Bytes>) {
611 (
612 gen_key_from_str(VirtualNode::from_index(vnode), &gen_inner_key(index)),
613 SharedBufferValue::Insert(Bytes::copy_from_slice(
614 format!("{}-value-{}", vnode, index).as_bytes(),
615 )),
616 )
617 }
618
619 #[tokio::test]
620 async fn test_no_watermark() {
621 test_watermark(empty(), WatermarkDirection::Ascending).await;
622 test_watermark(empty(), WatermarkDirection::Descending).await;
623 }
624
625 #[tokio::test]
626 async fn test_too_low_watermark() {
627 test_watermark(vec![(0, 0)], WatermarkDirection::Ascending).await;
628 test_watermark(vec![(0, 10)], WatermarkDirection::Descending).await;
629 }
630
631 #[tokio::test]
632 async fn test_single_watermark() {
633 test_watermark(vec![(0, 3)], WatermarkDirection::Ascending).await;
634 test_watermark(vec![(0, 3)], WatermarkDirection::Descending).await;
635 }
636
637 #[tokio::test]
638 async fn test_watermark_vnode_no_data() {
639 test_watermark(vec![(3, 3)], WatermarkDirection::Ascending).await;
640 test_watermark(vec![(3, 3)], WatermarkDirection::Descending).await;
641 }
642
643 #[tokio::test]
644 async fn test_filter_all() {
645 test_watermark(
646 vec![(0, 5), (1, 4), (2, 0), (4, 3), (8, 2)],
647 WatermarkDirection::Ascending,
648 )
649 .await;
650 test_watermark(
651 vec![(0, 0), (1, 0), (2, 0), (4, 0), (8, 0)],
652 WatermarkDirection::Descending,
653 )
654 .await;
655 }
656
657 #[tokio::test]
658 async fn test_advance_multi_vnode() {
659 test_watermark(vec![(1, 2), (8, 0)], WatermarkDirection::Ascending).await;
660 }
661
662 #[tokio::test]
663 async fn test_non_pk_prefix_watermark() {
664 fn gen_key_value(
665 vnode: usize,
666 col_0: i32,
667 col_1: i32,
668 col_2: i32,
669 col_3: i32,
670 pk_serde: &OrderedRowSerde,
671 pk_indices: &[usize],
672 ) -> (TableKey<Bytes>, SharedBufferValue<Bytes>) {
673 let r = OwnedRow::new(vec![
674 Some(ScalarImpl::Int32(col_0)),
675 Some(ScalarImpl::Int32(col_1)),
676 Some(ScalarImpl::Int32(col_2)), Some(ScalarImpl::Int32(col_3)),
678 ]);
679
680 let pk = r.project(pk_indices);
681
682 let k1 = serialize_pk_with_vnode(pk, pk_serde, VirtualNode::from_index(vnode));
683 let v1 = SharedBufferValue::Insert(Bytes::copy_from_slice(
684 format!("{}-value-{}", vnode, col_2).as_bytes(),
685 ));
686 (k1, v1)
687 }
688
689 let watermark_direction = WatermarkDirection::Ascending;
690
691 let watermark_col_serde =
692 OrderedRowSerde::new(vec![DataType::Int32], vec![OrderType::ascending()]);
693 let pk_serde = OrderedRowSerde::new(
694 vec![DataType::Int32, DataType::Int32, DataType::Int32],
695 vec![
696 OrderType::ascending(),
697 OrderType::ascending(),
698 OrderType::ascending(),
699 ],
700 );
701
702 let pk_indices = vec![0, 2, 3];
703 let watermark_col_idx_in_pk = 1;
704
705 {
706 let shared_buffer_batch = {
708 let mut kv_pairs = (0..10)
709 .map(|i| gen_key_value(0, i, 0, i, i, &pk_serde, &pk_indices))
710 .collect_vec();
711 kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
712 build_batch(kv_pairs.into_iter(), TABLE_ID)
713 }
714 .unwrap();
715
716 {
717 let read_watermark = ReadTableWatermark {
719 direction: watermark_direction,
720 vnode_watermarks: BTreeMap::default(),
721 };
722
723 let compaction_catalog_agent_ref =
724 CompactionCatalogAgent::for_test(vec![TABLE_ID.into()]);
725
726 let mut iter = NonPkPrefixSkipWatermarkIterator::new(
727 shared_buffer_batch.clone().into_forward_iter(),
728 NonPkPrefixSkipWatermarkState::new(
729 BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
730 compaction_catalog_agent_ref,
731 ),
732 );
733
734 iter.rewind().await.unwrap();
735 assert!(iter.is_valid());
736 for i in 0..10 {
737 let (k, _v) = gen_key_value(0, i, 0, i, i, &pk_serde, &pk_indices);
738 assert_eq!(iter.key().user_key.table_key.as_ref(), k.as_ref());
739 iter.next().await.unwrap();
740 }
741 assert!(!iter.is_valid());
742 }
743
744 {
745 let watermark = {
747 let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
748 serialize_pk(r1, &watermark_col_serde)
749 };
750
751 let read_watermark = ReadTableWatermark {
752 direction: watermark_direction,
753 vnode_watermarks: BTreeMap::from_iter(once((
754 VirtualNode::from_index(0),
755 watermark.clone(),
756 ))),
757 };
758
759 let full_key_filter_key_extractor =
760 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
761
762 let table_id_to_vnode =
763 HashMap::from_iter(once((TABLE_ID.table_id(), VirtualNode::COUNT_FOR_TEST)));
764
765 let table_id_to_watermark_serde = HashMap::from_iter(once((
766 TABLE_ID.table_id(),
767 Some((
768 pk_serde.clone(),
769 watermark_col_serde.clone(),
770 watermark_col_idx_in_pk,
771 )),
772 )));
773
774 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
775 full_key_filter_key_extractor,
776 table_id_to_vnode,
777 table_id_to_watermark_serde,
778 ));
779
780 let mut iter = NonPkPrefixSkipWatermarkIterator::new(
781 shared_buffer_batch.clone().into_forward_iter(),
782 NonPkPrefixSkipWatermarkState::new(
783 BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
784 compaction_catalog_agent_ref,
785 ),
786 );
787
788 iter.rewind().await.unwrap();
789 assert!(iter.is_valid());
790 for i in 5..10 {
791 let (k, _v) = gen_key_value(0, i, 0, i, i, &pk_serde, &pk_indices);
792 assert_eq!(iter.key().user_key.table_key.as_ref(), k.as_ref());
793 iter.next().await.unwrap();
794 }
795 assert!(!iter.is_valid());
796 }
797 }
798
799 {
800 let shared_buffer_batch = {
802 let mut kv_pairs = (0..10_i32)
803 .map(|i| gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices))
804 .collect_vec();
805
806 kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
807 build_batch(kv_pairs.into_iter(), TABLE_ID)
808 };
809
810 {
811 let watermark = {
813 let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
814 serialize_pk(r1, &watermark_col_serde)
815 };
816
817 let read_watermark = ReadTableWatermark {
818 direction: watermark_direction,
819 vnode_watermarks: BTreeMap::from_iter(
820 (0..2).map(|i| (VirtualNode::from_index(i), watermark.clone())),
821 ),
822 };
823
824 let full_key_filter_key_extractor =
825 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
826
827 let table_id_to_vnode =
828 HashMap::from_iter(once((TABLE_ID.table_id(), VirtualNode::COUNT_FOR_TEST)));
829
830 let table_id_to_watermark_serde = HashMap::from_iter(once((
831 TABLE_ID.table_id(),
832 Some((
833 pk_serde.clone(),
834 watermark_col_serde.clone(),
835 watermark_col_idx_in_pk,
836 )),
837 )));
838
839 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
840 full_key_filter_key_extractor,
841 table_id_to_vnode,
842 table_id_to_watermark_serde,
843 ));
844
845 let mut iter = NonPkPrefixSkipWatermarkIterator::new(
846 shared_buffer_batch.clone().unwrap().into_forward_iter(),
847 NonPkPrefixSkipWatermarkState::new(
848 BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
849 compaction_catalog_agent_ref,
850 ),
851 );
852
853 iter.rewind().await.unwrap();
854 assert!(iter.is_valid());
855 let mut kv_pairs = (5..10_i32)
856 .map(|i| {
857 let (k, v) =
858 gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices);
859 (k, v)
860 })
861 .collect_vec();
862 kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
863 let mut index = 0;
864 while iter.is_valid() {
865 assert!(kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
866 iter.next().await.unwrap();
867 index += 1;
868 }
869 }
870
871 {
872 let watermark = {
874 let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
875 serialize_pk(r1, &watermark_col_serde)
876 };
877
878 let read_watermark = ReadTableWatermark {
879 direction: watermark_direction,
880 vnode_watermarks: BTreeMap::from_iter(
881 (0..2).map(|i| (VirtualNode::from_index(i), watermark.clone())),
882 ),
883 };
884
885 let full_key_filter_key_extractor =
886 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
887
888 let table_id_to_vnode =
889 HashMap::from_iter(once((TABLE_ID.table_id(), VirtualNode::COUNT_FOR_TEST)));
890
891 let table_id_to_watermark_serde = HashMap::from_iter(once((
892 TABLE_ID.table_id(),
893 Some((
894 pk_serde.clone(),
895 watermark_col_serde.clone(),
896 watermark_col_idx_in_pk,
897 )),
898 )));
899
900 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
901 full_key_filter_key_extractor,
902 table_id_to_vnode,
903 table_id_to_watermark_serde,
904 ));
905
906 let mut iter = NonPkPrefixSkipWatermarkIterator::new(
907 shared_buffer_batch.clone().unwrap().into_forward_iter(),
908 NonPkPrefixSkipWatermarkState::new(
909 BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
910 compaction_catalog_agent_ref,
911 ),
912 );
913
914 iter.rewind().await.unwrap();
915 assert!(iter.is_valid());
916 let mut kv_pairs = (5..10_i32)
917 .map(|i| {
918 let (k, v) =
919 gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices);
920 (k, v)
921 })
922 .collect_vec();
923 kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
924 let mut index = 0;
925 while iter.is_valid() {
926 assert!(kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
927 iter.next().await.unwrap();
928 index += 1;
929 }
930 }
931
932 {
933 let watermark = {
935 let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
936 serialize_pk(r1, &watermark_col_serde)
937 };
938
939 let read_watermark = ReadTableWatermark {
940 direction: WatermarkDirection::Descending,
941 vnode_watermarks: BTreeMap::from_iter(
942 (0..2).map(|i| (VirtualNode::from_index(i), watermark.clone())),
943 ),
944 };
945
946 let full_key_filter_key_extractor =
947 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
948
949 let table_id_to_vnode =
950 HashMap::from_iter(once((TABLE_ID.table_id(), VirtualNode::COUNT_FOR_TEST)));
951
952 let table_id_to_watermark_serde = HashMap::from_iter(once((
953 TABLE_ID.table_id(),
954 Some((
955 pk_serde.clone(),
956 watermark_col_serde.clone(),
957 watermark_col_idx_in_pk,
958 )),
959 )));
960
961 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
962 full_key_filter_key_extractor,
963 table_id_to_vnode,
964 table_id_to_watermark_serde,
965 ));
966
967 let mut iter = NonPkPrefixSkipWatermarkIterator::new(
968 shared_buffer_batch.clone().unwrap().into_forward_iter(),
969 NonPkPrefixSkipWatermarkState::new(
970 BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
971 compaction_catalog_agent_ref,
972 ),
973 );
974
975 iter.rewind().await.unwrap();
976 assert!(iter.is_valid());
977 let mut kv_pairs = (0..=5_i32)
978 .map(|i| {
979 let (k, v) =
980 gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices);
981 (k, v)
982 })
983 .collect_vec();
984 kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
985 let mut index = 0;
986 while iter.is_valid() {
987 assert!(kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
988 iter.next().await.unwrap();
989 index += 1;
990 }
991 }
992 }
993 }
994
995 #[tokio::test]
996 async fn test_mix_watermark() {
997 fn gen_key_value(
998 vnode: usize,
999 col_0: i32,
1000 col_1: i32,
1001 col_2: i32,
1002 col_3: i32,
1003 pk_serde: &OrderedRowSerde,
1004 pk_indices: &[usize],
1005 ) -> (TableKey<Bytes>, SharedBufferValue<Bytes>) {
1006 let r = OwnedRow::new(vec![
1007 Some(ScalarImpl::Int32(col_0)),
1008 Some(ScalarImpl::Int32(col_1)),
1009 Some(ScalarImpl::Int32(col_2)), Some(ScalarImpl::Int32(col_3)),
1011 ]);
1012
1013 let pk = r.project(pk_indices);
1014
1015 let k1 = serialize_pk_with_vnode(pk, pk_serde, VirtualNode::from_index(vnode));
1016 let v1 = SharedBufferValue::Insert(Bytes::copy_from_slice(
1017 format!("{}-value-{}-{}-{}-{}", vnode, col_0, col_1, col_2, col_3).as_bytes(),
1018 ));
1019
1020 (k1, v1)
1021 }
1022
1023 let watermark_col_serde =
1024 OrderedRowSerde::new(vec![DataType::Int32], vec![OrderType::ascending()]);
1025 let t1_pk_serde = OrderedRowSerde::new(
1026 vec![DataType::Int32, DataType::Int32, DataType::Int32],
1027 vec![
1028 OrderType::ascending(),
1029 OrderType::ascending(),
1030 OrderType::ascending(),
1031 ],
1032 );
1033
1034 let t1_pk_indices = vec![0, 2, 3];
1035 let t1_watermark_col_idx_in_pk = 1;
1036
1037 let t2_pk_indices = vec![0, 1, 2];
1038
1039 let t2_pk_serde = OrderedRowSerde::new(
1040 vec![DataType::Int32, DataType::Int32, DataType::Int32],
1041 vec![
1042 OrderType::ascending(),
1043 OrderType::ascending(),
1044 OrderType::ascending(),
1045 ],
1046 );
1047
1048 let t1_id = TABLE_ID;
1049 let t2_id = TableId::from(t1_id.table_id() + 1);
1050
1051 let t1_shared_buffer_batch = {
1052 let mut kv_pairs = (0..10_i32)
1053 .map(|i| {
1054 gen_key_value(
1055 i as usize % 2,
1056 10 - i,
1057 0,
1058 i,
1059 i,
1060 &t1_pk_serde,
1061 &t1_pk_indices,
1062 )
1063 })
1064 .collect_vec();
1065
1066 kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1067 build_batch(kv_pairs.into_iter(), t1_id).unwrap()
1068 };
1069
1070 let t2_shared_buffer_batch = {
1071 let mut kv_pairs = (0..10_i32)
1072 .map(|i| {
1073 gen_key_value(
1074 i as usize % 2,
1075 10 - i,
1076 0,
1077 0,
1078 0,
1079 &t2_pk_serde,
1080 &t2_pk_indices,
1081 )
1082 })
1083 .collect_vec();
1084
1085 kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1086 build_batch(kv_pairs.into_iter(), t2_id).unwrap()
1087 };
1088
1089 let t1_watermark = {
1090 let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
1091 serialize_pk(r1, &watermark_col_serde)
1092 };
1093
1094 let t1_read_watermark = ReadTableWatermark {
1095 direction: WatermarkDirection::Ascending,
1096 vnode_watermarks: BTreeMap::from_iter(
1097 (0..2).map(|i| (VirtualNode::from_index(i), t1_watermark.clone())),
1098 ),
1099 };
1100
1101 let t2_watermark = {
1102 let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
1103 serialize_pk(r1, &watermark_col_serde)
1104 };
1105
1106 let t2_read_watermark = ReadTableWatermark {
1107 direction: WatermarkDirection::Ascending,
1108 vnode_watermarks: BTreeMap::from_iter(
1109 (0..2).map(|i| (VirtualNode::from_index(i), t2_watermark.clone())),
1110 ),
1111 };
1112
1113 {
1114 let t1_iter = t1_shared_buffer_batch.clone().into_forward_iter();
1116 let t2_iter = t2_shared_buffer_batch.clone().into_forward_iter();
1117 let iter_vec = vec![t1_iter, t2_iter];
1118 let merge_iter = MergeIterator::new(iter_vec);
1119
1120 let full_key_filter_key_extractor =
1121 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
1122
1123 let table_id_to_vnode =
1124 HashMap::from_iter(once((TABLE_ID.table_id(), VirtualNode::COUNT_FOR_TEST)));
1125
1126 let table_id_to_watermark_serde = HashMap::from_iter(once((
1127 t1_id.table_id(),
1128 Some((
1129 t1_pk_serde.clone(),
1130 watermark_col_serde.clone(),
1131 t1_watermark_col_idx_in_pk,
1132 )),
1133 )));
1134
1135 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1136 full_key_filter_key_extractor,
1137 table_id_to_vnode,
1138 table_id_to_watermark_serde,
1139 ));
1140
1141 let mut iter = NonPkPrefixSkipWatermarkIterator::new(
1142 merge_iter,
1143 NonPkPrefixSkipWatermarkState::new(
1144 BTreeMap::from_iter(once((TABLE_ID, t1_read_watermark.clone()))),
1145 compaction_catalog_agent_ref,
1146 ),
1147 );
1148
1149 iter.rewind().await.unwrap();
1150 assert!(iter.is_valid());
1151 let mut t1_kv_pairs = (5..10_i32)
1152 .map(|i| {
1153 let (k, v) = gen_key_value(
1154 i as usize % 2,
1155 10 - i,
1156 0,
1157 i,
1158 i,
1159 &t1_pk_serde,
1160 &t1_pk_indices,
1161 );
1162 (k, v)
1163 })
1164 .collect_vec();
1165
1166 t1_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1167
1168 let mut t2_kv_pairs = (0..10_i32)
1169 .map(|i| {
1170 gen_key_value(
1171 i as usize % 2,
1172 10 - i,
1173 0,
1174 0,
1175 0,
1176 &t2_pk_serde,
1177 &t2_pk_indices,
1178 )
1179 })
1180 .collect_vec();
1181
1182 t2_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1183 let mut index = 0;
1184 for _ in 0..t1_kv_pairs.len() {
1185 assert!(t1_kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
1186 iter.next().await.unwrap();
1187 index += 1;
1188 }
1189
1190 assert!(iter.is_valid());
1191 assert_eq!(t1_kv_pairs.len(), index);
1192
1193 index = 0;
1194 for _ in 0..t2_kv_pairs.len() {
1195 assert!(t2_kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
1196 iter.next().await.unwrap();
1197 index += 1;
1198 }
1199
1200 assert!(!iter.is_valid());
1201 assert_eq!(t2_kv_pairs.len(), index);
1202 }
1203
1204 {
1205 let t1_iter = t1_shared_buffer_batch.clone().into_forward_iter();
1206 let t2_iter = t2_shared_buffer_batch.clone().into_forward_iter();
1207 let iter_vec = vec![t1_iter, t2_iter];
1208 let merge_iter = MergeIterator::new(iter_vec);
1209
1210 let full_key_filter_key_extractor =
1211 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
1212
1213 let table_id_to_vnode = HashMap::from_iter(
1214 vec![
1215 (t1_id.table_id(), VirtualNode::COUNT_FOR_TEST),
1216 (t2_id.table_id(), VirtualNode::COUNT_FOR_TEST),
1217 ]
1218 .into_iter(),
1219 );
1220
1221 let table_id_to_watermark_serde = HashMap::from_iter(
1222 vec![
1223 (
1224 t1_id.table_id(),
1225 Some((
1226 t1_pk_serde.clone(),
1227 watermark_col_serde.clone(),
1228 t1_watermark_col_idx_in_pk,
1229 )),
1230 ),
1231 (t2_id.table_id(), None),
1232 ]
1233 .into_iter(),
1234 );
1235
1236 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1237 full_key_filter_key_extractor,
1238 table_id_to_vnode,
1239 table_id_to_watermark_serde,
1240 ));
1241
1242 let non_pk_prefix_iter = NonPkPrefixSkipWatermarkIterator::new(
1243 merge_iter,
1244 NonPkPrefixSkipWatermarkState::new(
1245 BTreeMap::from_iter(once((t1_id, t1_read_watermark.clone()))),
1246 compaction_catalog_agent_ref.clone(),
1247 ),
1248 );
1249
1250 let mut mix_iter = PkPrefixSkipWatermarkIterator::new(
1251 non_pk_prefix_iter,
1252 PkPrefixSkipWatermarkState::new(BTreeMap::from_iter(once((
1253 t2_id,
1254 t2_read_watermark.clone(),
1255 )))),
1256 );
1257
1258 mix_iter.rewind().await.unwrap();
1259 assert!(mix_iter.is_valid());
1260
1261 let mut t1_kv_pairs = (5..10_i32)
1262 .map(|i| {
1263 let (k, v) = gen_key_value(
1264 i as usize % 2,
1265 10 - i,
1266 0,
1267 i,
1268 i,
1269 &t1_pk_serde,
1270 &t1_pk_indices,
1271 );
1272 (k, v)
1273 })
1274 .collect_vec();
1275
1276 t1_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1277
1278 let mut t2_kv_pairs = (0..=5_i32)
1279 .map(|i| {
1280 gen_key_value(
1281 i as usize % 2,
1282 10 - i,
1283 0,
1284 0,
1285 0,
1286 &t2_pk_serde,
1287 &t2_pk_indices,
1288 )
1289 })
1290 .collect_vec();
1291
1292 t2_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1293
1294 let mut index = 0;
1295 for _ in 0..t1_kv_pairs.len() {
1296 assert!(
1297 t1_kv_pairs[index].0.as_ref() == mix_iter.key().user_key.table_key.as_ref()
1298 );
1299 mix_iter.next().await.unwrap();
1300 index += 1;
1301 }
1302
1303 assert!(mix_iter.is_valid());
1304 assert_eq!(t1_kv_pairs.len(), index);
1305
1306 index = 0;
1307
1308 for _ in 0..t2_kv_pairs.len() {
1309 assert!(
1310 t2_kv_pairs[index].0.as_ref() == mix_iter.key().user_key.table_key.as_ref()
1311 );
1312 mix_iter.next().await.unwrap();
1313 index += 1;
1314 }
1315
1316 assert!(!mix_iter.is_valid());
1317 assert_eq!(t2_kv_pairs.len(), index);
1318 }
1319 }
1320}