1use std::cmp::Ordering;
16use std::collections::{BTreeMap, VecDeque};
17
18use bytes::Bytes;
19use risingwave_common::catalog::TableId;
20use risingwave_common::hash::VirtualNode;
21use risingwave_common::row::Row;
22use risingwave_common::types::Datum;
23use risingwave_common::util::row_serde::OrderedRowSerde;
24use risingwave_hummock_sdk::compaction_group::hummock_version_ext::safe_epoch_read_table_watermarks_impl;
25use risingwave_hummock_sdk::key::FullKey;
26use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap, add_table_stats_map};
27use risingwave_hummock_sdk::table_watermark::{
28 ReadTableWatermark, TableWatermarks, WatermarkDirection,
29};
30
31use super::SkipWatermarkState;
32use crate::compaction_catalog_manager::CompactionCatalogAgentRef;
33use crate::hummock::HummockResult;
34use crate::hummock::iterator::{Forward, HummockIterator, ValueMeta};
35use crate::hummock::value::HummockValue;
36use crate::monitor::StoreLocalStatistic;
37
38pub struct SkipWatermarkIterator<I, S> {
39 inner: I,
40 state: S,
41 skipped_entry_table_stats: TableStatsMap,
43 last_table_id: Option<TableId>,
45 last_table_stats: TableStats,
47}
48
49impl<I: HummockIterator<Direction = Forward>, S: SkipWatermarkState> SkipWatermarkIterator<I, S> {
50 pub fn new(inner: I, state: S) -> Self {
51 Self {
52 inner,
53 state,
54 skipped_entry_table_stats: TableStatsMap::default(),
55 last_table_id: None,
56 last_table_stats: TableStats::default(),
57 }
58 }
59
60 fn reset_watermark(&mut self) {
61 self.state.reset_watermark();
62 }
63
64 fn reset_skipped_entry_table_stats(&mut self) {
65 self.skipped_entry_table_stats = TableStatsMap::default();
66 self.last_table_id = None;
67 self.last_table_stats = TableStats::default();
68 }
69
70 async fn advance_key_and_watermark(&mut self) -> HummockResult<()> {
75 while self.inner.is_valid() {
78 if !self.state.should_delete(&self.inner.key()) {
79 break;
80 }
81
82 if self
83 .last_table_id
84 .is_none_or(|last_table_id| last_table_id != self.inner.key().user_key.table_id)
85 {
86 self.add_last_table_stats();
87 self.last_table_id = Some(self.inner.key().user_key.table_id);
88 }
89 self.last_table_stats.total_key_count -= 1;
90 self.last_table_stats.total_key_size -= self.inner.key().encoded_len() as i64;
91 self.last_table_stats.total_value_size -= self.inner.value().encoded_len() as i64;
92
93 self.inner.next().await?;
94 }
95 self.add_last_table_stats();
96 Ok(())
97 }
98
99 fn add_last_table_stats(&mut self) {
100 let Some(last_table_id) = self.last_table_id.take() else {
101 return;
102 };
103 let delta = std::mem::take(&mut self.last_table_stats);
104 let e = self
105 .skipped_entry_table_stats
106 .entry(last_table_id)
107 .or_default();
108 e.total_key_count += delta.total_key_count;
109 e.total_key_size += delta.total_key_size;
110 e.total_value_size += delta.total_value_size;
111 }
112}
113
114impl<I: HummockIterator<Direction = Forward>, S: SkipWatermarkState> HummockIterator
115 for SkipWatermarkIterator<I, S>
116{
117 type Direction = Forward;
118
119 async fn next(&mut self) -> HummockResult<()> {
120 self.inner.next().await?;
121 if self.state.has_watermark() {
125 self.advance_key_and_watermark().await?;
126 }
127 Ok(())
128 }
129
130 fn key(&self) -> FullKey<&[u8]> {
131 self.inner.key()
132 }
133
134 fn value(&self) -> HummockValue<&[u8]> {
135 self.inner.value()
136 }
137
138 fn is_valid(&self) -> bool {
139 self.inner.is_valid()
140 }
141
142 async fn rewind(&mut self) -> HummockResult<()> {
143 self.reset_watermark();
144 self.reset_skipped_entry_table_stats();
145 self.inner.rewind().await?;
146 self.advance_key_and_watermark().await?;
147 Ok(())
148 }
149
150 async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
151 self.reset_watermark();
152 self.reset_skipped_entry_table_stats();
153 self.inner.seek(key).await?;
154 self.advance_key_and_watermark().await?;
155 Ok(())
156 }
157
158 fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
159 add_table_stats_map(
160 &mut stats.skipped_by_watermark_table_stats,
161 &self.skipped_entry_table_stats,
162 );
163 self.inner.collect_local_statistic(stats)
164 }
165
166 fn value_meta(&self) -> ValueMeta {
167 self.inner.value_meta()
168 }
169}
170pub struct PkPrefixSkipWatermarkState {
171 watermarks: BTreeMap<TableId, ReadTableWatermark>,
172 remain_watermarks: VecDeque<(TableId, VirtualNode, WatermarkDirection, Bytes)>,
173}
174
175impl SkipWatermarkState for PkPrefixSkipWatermarkState {
176 #[inline(always)]
177 fn has_watermark(&self) -> bool {
178 !self.remain_watermarks.is_empty()
179 }
180
181 fn should_delete(&mut self, key: &FullKey<&[u8]>) -> bool {
182 if let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
183 let key_table_id = key.user_key.table_id;
184 let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
185 match (&key_table_id, &key_vnode).cmp(&(table_id, vnode)) {
186 Ordering::Less => {
187 return false;
188 }
189 Ordering::Equal => {
190 return direction.key_filter_by_watermark(inner_key, watermark);
191 }
192 Ordering::Greater => {
193 return self.advance_watermark(key);
196 }
197 }
198 }
199 false
200 }
201
202 fn reset_watermark(&mut self) {
203 self.remain_watermarks = self
204 .watermarks
205 .iter()
206 .flat_map(|(table_id, read_watermarks)| {
207 read_watermarks
208 .vnode_watermarks
209 .iter()
210 .map(|(vnode, watermarks)| {
211 (
212 *table_id,
213 *vnode,
214 read_watermarks.direction,
215 watermarks.clone(),
216 )
217 })
218 })
219 .collect();
220 }
221
222 fn advance_watermark(&mut self, key: &FullKey<&[u8]>) -> bool {
227 let key_table_id = key.user_key.table_id;
228 let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
229 while let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
230 match (table_id, vnode).cmp(&(&key_table_id, &key_vnode)) {
231 Ordering::Less => {
232 self.remain_watermarks.pop_front();
233 continue;
234 }
235 Ordering::Equal => {
236 match direction {
237 WatermarkDirection::Ascending => {
238 match inner_key.cmp(watermark.as_ref()) {
239 Ordering::Less => {
240 return true;
243 }
244 Ordering::Equal | Ordering::Greater => {
245 self.remain_watermarks.pop_front();
248 #[cfg(debug_assertions)]
253 {
254 if let Some((next_table_id, next_vnode, _, _)) =
255 self.remain_watermarks.front()
256 {
257 assert!(
258 (next_table_id, next_vnode)
259 > (&key_table_id, &key_vnode)
260 );
261 }
262 }
263 return false;
264 }
265 }
266 }
267 WatermarkDirection::Descending => {
268 return match inner_key.cmp(watermark.as_ref()) {
269 Ordering::Less | Ordering::Equal => false,
271 Ordering::Greater => true,
274 };
275 }
276 }
277 }
278 Ordering::Greater => {
279 return false;
280 }
281 }
282 }
283 false
284 }
285}
286
287impl PkPrefixSkipWatermarkState {
288 pub fn new(watermarks: BTreeMap<TableId, ReadTableWatermark>) -> Self {
289 Self {
290 remain_watermarks: VecDeque::new(),
291 watermarks,
292 }
293 }
294
295 pub fn from_safe_epoch_watermarks(
296 safe_epoch_watermarks: BTreeMap<TableId, TableWatermarks>,
297 ) -> Self {
298 let watermarks = safe_epoch_read_table_watermarks_impl(safe_epoch_watermarks);
299 Self::new(watermarks)
300 }
301}
302
303pub struct NonPkPrefixSkipWatermarkState {
304 watermarks: BTreeMap<TableId, ReadTableWatermark>,
305 remain_watermarks: VecDeque<(TableId, VirtualNode, WatermarkDirection, Datum)>,
306 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
307
308 last_serde: Option<(OrderedRowSerde, OrderedRowSerde, usize)>,
309 last_table_id: Option<TableId>,
310}
311
312impl NonPkPrefixSkipWatermarkState {
313 pub fn new(
314 watermarks: BTreeMap<TableId, ReadTableWatermark>,
315 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
316 ) -> Self {
317 Self {
318 remain_watermarks: VecDeque::new(),
319 watermarks,
320 compaction_catalog_agent_ref,
321 last_serde: None,
322 last_table_id: None,
323 }
324 }
325
326 pub fn from_safe_epoch_watermarks(
327 safe_epoch_watermarks: BTreeMap<TableId, TableWatermarks>,
328 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
329 ) -> Self {
330 let watermarks = safe_epoch_read_table_watermarks_impl(safe_epoch_watermarks);
331 Self::new(watermarks, compaction_catalog_agent_ref)
332 }
333}
334
335impl SkipWatermarkState for NonPkPrefixSkipWatermarkState {
336 #[inline(always)]
337 fn has_watermark(&self) -> bool {
338 !self.remain_watermarks.is_empty()
339 }
340
341 fn should_delete(&mut self, key: &FullKey<&[u8]>) -> bool {
342 if let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
343 let key_table_id = key.user_key.table_id;
344 {
345 if self
346 .last_table_id
347 .is_none_or(|last_table_id| last_table_id != key_table_id)
348 {
349 self.last_table_id = Some(key_table_id);
350 self.last_serde = self.compaction_catalog_agent_ref.watermark_serde(*table_id);
351 }
352 }
353
354 let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
355 match (&key_table_id, &key_vnode).cmp(&(table_id, vnode)) {
356 Ordering::Less => {
357 return false;
358 }
359 Ordering::Equal => {
360 let (pk_prefix_serde, watermark_col_serde, watermark_col_idx_in_pk) =
361 self.last_serde.as_ref().unwrap();
362 let row = pk_prefix_serde
363 .deserialize(inner_key)
364 .unwrap_or_else(|_| {
365 panic!("Failed to deserialize pk_prefix inner_key {:?} serde data_types {:?} order_types {:?}", inner_key, pk_prefix_serde.get_data_types(), pk_prefix_serde.get_order_types());
366 });
367 let watermark_col_in_pk = row.datum_at(*watermark_col_idx_in_pk);
368 return direction.datum_filter_by_watermark(
369 watermark_col_in_pk,
370 watermark,
371 watermark_col_serde.get_order_types()[0],
372 );
373 }
374 Ordering::Greater => {
375 return self.advance_watermark(key);
378 }
379 }
380 }
381 false
382 }
383
384 fn reset_watermark(&mut self) {
385 self.remain_watermarks = self
386 .watermarks
387 .iter()
388 .flat_map(|(table_id, read_watermarks)| {
389 let watermark_serde = self.compaction_catalog_agent_ref.watermark_serde(*table_id).map(|(_pk_serde, watermark_serde, _watermark_col_idx_in_pk)| watermark_serde);
390
391 read_watermarks
392 .vnode_watermarks
393 .iter()
394 .map(move |(vnode, watermarks)| {
395 (
396 *table_id,
397 *vnode,
398 read_watermarks.direction,
399 {
400 let watermark_serde = watermark_serde.as_ref().unwrap();
401 let row = watermark_serde
402 .deserialize(watermarks).unwrap_or_else(|_| {
403 panic!("Failed to deserialize watermark {:?} serde data_types {:?} order_types {:?}", watermarks, watermark_serde.get_data_types(), watermark_serde.get_order_types());
404 });
405 row[0].clone()
406 },
407 )
408 })
409 })
410 .collect();
411 }
412
413 fn advance_watermark(&mut self, key: &FullKey<&[u8]>) -> bool {
414 let key_table_id = key.user_key.table_id;
415 let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
416 while let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
417 match (table_id, vnode).cmp(&(&key_table_id, &key_vnode)) {
418 Ordering::Less => {
419 self.remain_watermarks.pop_front();
420 continue;
421 }
422 Ordering::Equal => {
423 let (pk_prefix_serde, watermark_col_serde, watermark_col_idx_in_pk) =
424 self.last_serde.as_ref().unwrap();
425
426 let row = pk_prefix_serde
427 .deserialize(inner_key)
428 .unwrap_or_else(|_| {
429 panic!("Failed to deserialize pk_prefix inner_key {:?} serde data_types {:?} order_types {:?}", inner_key, pk_prefix_serde.get_data_types(), pk_prefix_serde.get_order_types());
430 });
431 let watermark_col_in_pk = row.datum_at(*watermark_col_idx_in_pk);
432
433 return direction.datum_filter_by_watermark(
434 watermark_col_in_pk,
435 watermark,
436 watermark_col_serde.get_order_types()[0],
437 );
438 }
439 Ordering::Greater => {
440 return false;
441 }
442 }
443 }
444 false
445 }
446}
447
448pub type PkPrefixSkipWatermarkIterator<I> = SkipWatermarkIterator<I, PkPrefixSkipWatermarkState>;
449
450pub type NonPkPrefixSkipWatermarkIterator<I> =
451 SkipWatermarkIterator<I, NonPkPrefixSkipWatermarkState>;
452
453#[cfg(test)]
454mod tests {
455 use std::collections::{BTreeMap, HashMap};
456 use std::iter::{empty, once};
457 use std::sync::Arc;
458
459 use bytes::Bytes;
460 use itertools::Itertools;
461 use risingwave_common::catalog::TableId;
462 use risingwave_common::hash::VirtualNode;
463 use risingwave_common::row::{OwnedRow, RowExt};
464 use risingwave_common::types::{DataType, ScalarImpl};
465 use risingwave_common::util::epoch::test_epoch;
466 use risingwave_common::util::row_serde::OrderedRowSerde;
467 use risingwave_common::util::sort_util::OrderType;
468 use risingwave_hummock_sdk::EpochWithGap;
469 use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey, gen_key_from_str};
470 use risingwave_hummock_sdk::table_watermark::{ReadTableWatermark, WatermarkDirection};
471
472 use super::PkPrefixSkipWatermarkState;
473 use crate::compaction_catalog_manager::{
474 CompactionCatalogAgent, FilterKeyExtractorImpl, FullKeyFilterKeyExtractor,
475 };
476 use crate::hummock::iterator::{
477 HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
478 NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator,
479 };
480 use crate::hummock::shared_buffer::shared_buffer_batch::{
481 SharedBufferBatch, SharedBufferValue,
482 };
483 use crate::row_serde::row_serde_util::{serialize_pk, serialize_pk_with_vnode};
484
485 const EPOCH: u64 = test_epoch(1);
486 const TABLE_ID: TableId = TableId::new(233);
487
488 async fn assert_iter_eq(
489 mut first: Option<impl HummockIterator>,
490 mut second: impl HummockIterator,
491 seek_key: Option<(usize, usize)>,
492 ) {
493 if let Some((vnode, key_index)) = seek_key {
494 let (seek_key, _) = gen_key_value(vnode, key_index);
495 let full_key = FullKey {
496 user_key: UserKey {
497 table_id: TABLE_ID,
498 table_key: seek_key,
499 },
500 epoch_with_gap: EpochWithGap::new_from_epoch(EPOCH),
501 };
502 if let Some(first) = &mut first {
503 first.seek(full_key.to_ref()).await.unwrap();
504 }
505 second.seek(full_key.to_ref()).await.unwrap()
506 } else {
507 if let Some(first) = &mut first {
508 first.rewind().await.unwrap();
509 }
510 second.rewind().await.unwrap();
511 }
512
513 if let Some(first) = &mut first {
514 while first.is_valid() {
515 assert!(second.is_valid());
516 let first_key = first.key();
517 let second_key = second.key();
518 assert_eq!(first_key, second_key);
519 assert_eq!(first.value(), second.value());
520 first.next().await.unwrap();
521 second.next().await.unwrap();
522 }
523 }
524 assert!(!second.is_valid());
525 }
526
527 fn build_batch(
528 pairs: impl Iterator<Item = (TableKey<Bytes>, SharedBufferValue<Bytes>)>,
529 table_id: TableId,
530 ) -> Option<SharedBufferBatch> {
531 let pairs: Vec<_> = pairs.collect();
532 if pairs.is_empty() {
533 None
534 } else {
535 Some(SharedBufferBatch::for_test(pairs, EPOCH, table_id))
536 }
537 }
538
539 fn filter_with_watermarks(
540 iter: impl Iterator<Item = (TableKey<Bytes>, SharedBufferValue<Bytes>)>,
541 table_watermarks: ReadTableWatermark,
542 ) -> impl Iterator<Item = (TableKey<Bytes>, SharedBufferValue<Bytes>)> {
543 iter.filter(move |(key, _)| {
544 if let Some(watermark) = table_watermarks.vnode_watermarks.get(&key.vnode_part()) {
545 !table_watermarks
546 .direction
547 .key_filter_by_watermark(key.key_part(), watermark)
548 } else {
549 true
550 }
551 })
552 }
553
554 fn gen_inner_key(index: usize) -> String {
555 format!("key{:5}", index)
556 }
557
558 async fn test_watermark(
559 watermarks: impl IntoIterator<Item = (usize, usize)>,
560 direction: WatermarkDirection,
561 ) {
562 let test_index: [(usize, usize); 7] =
563 [(0, 2), (0, 3), (0, 4), (1, 1), (1, 3), (4, 2), (8, 1)];
564 let items = test_index
565 .iter()
566 .map(|(vnode, key_index)| gen_key_value(*vnode, *key_index))
567 .collect_vec();
568
569 let read_watermark = ReadTableWatermark {
570 direction,
571 vnode_watermarks: BTreeMap::from_iter(watermarks.into_iter().map(
572 |(vnode, key_index)| {
573 (
574 VirtualNode::from_index(vnode),
575 Bytes::from(gen_inner_key(key_index)),
576 )
577 },
578 )),
579 };
580
581 let gen_iters = || {
582 let batch = build_batch(
583 filter_with_watermarks(items.clone().into_iter(), read_watermark.clone()),
584 TABLE_ID,
585 );
586
587 let iter = PkPrefixSkipWatermarkIterator::new(
588 build_batch(items.clone().into_iter(), TABLE_ID)
589 .unwrap()
590 .into_forward_iter(),
591 PkPrefixSkipWatermarkState::new(BTreeMap::from_iter(once((
592 TABLE_ID,
593 read_watermark.clone(),
594 )))),
595 );
596 (batch.map(|batch| batch.into_forward_iter()), iter)
597 };
598 let (first, second) = gen_iters();
599 assert_iter_eq(first, second, None).await;
600 for (vnode, key_index) in &test_index {
601 let (first, second) = gen_iters();
602 assert_iter_eq(first, second, Some((*vnode, *key_index))).await;
603 }
604 let (last_vnode, last_key_index) = test_index.last().unwrap();
605 let (first, second) = gen_iters();
606 assert_iter_eq(first, second, Some((*last_vnode, last_key_index + 1))).await;
607 }
608
609 fn gen_key_value(vnode: usize, index: usize) -> (TableKey<Bytes>, SharedBufferValue<Bytes>) {
610 (
611 gen_key_from_str(VirtualNode::from_index(vnode), &gen_inner_key(index)),
612 SharedBufferValue::Insert(Bytes::copy_from_slice(
613 format!("{}-value-{}", vnode, index).as_bytes(),
614 )),
615 )
616 }
617
618 #[tokio::test]
619 async fn test_no_watermark() {
620 test_watermark(empty(), WatermarkDirection::Ascending).await;
621 test_watermark(empty(), WatermarkDirection::Descending).await;
622 }
623
624 #[tokio::test]
625 async fn test_too_low_watermark() {
626 test_watermark(vec![(0, 0)], WatermarkDirection::Ascending).await;
627 test_watermark(vec![(0, 10)], WatermarkDirection::Descending).await;
628 }
629
630 #[tokio::test]
631 async fn test_single_watermark() {
632 test_watermark(vec![(0, 3)], WatermarkDirection::Ascending).await;
633 test_watermark(vec![(0, 3)], WatermarkDirection::Descending).await;
634 }
635
636 #[tokio::test]
637 async fn test_watermark_vnode_no_data() {
638 test_watermark(vec![(3, 3)], WatermarkDirection::Ascending).await;
639 test_watermark(vec![(3, 3)], WatermarkDirection::Descending).await;
640 }
641
642 #[tokio::test]
643 async fn test_filter_all() {
644 test_watermark(
645 vec![(0, 5), (1, 4), (2, 0), (4, 3), (8, 2)],
646 WatermarkDirection::Ascending,
647 )
648 .await;
649 test_watermark(
650 vec![(0, 0), (1, 0), (2, 0), (4, 0), (8, 0)],
651 WatermarkDirection::Descending,
652 )
653 .await;
654 }
655
656 #[tokio::test]
657 async fn test_advance_multi_vnode() {
658 test_watermark(vec![(1, 2), (8, 0)], WatermarkDirection::Ascending).await;
659 }
660
661 #[tokio::test]
662 async fn test_non_pk_prefix_watermark() {
663 fn gen_key_value(
664 vnode: usize,
665 col_0: i32,
666 col_1: i32,
667 col_2: i32,
668 col_3: i32,
669 pk_serde: &OrderedRowSerde,
670 pk_indices: &[usize],
671 ) -> (TableKey<Bytes>, SharedBufferValue<Bytes>) {
672 let r = OwnedRow::new(vec![
673 Some(ScalarImpl::Int32(col_0)),
674 Some(ScalarImpl::Int32(col_1)),
675 Some(ScalarImpl::Int32(col_2)), Some(ScalarImpl::Int32(col_3)),
677 ]);
678
679 let pk = r.project(pk_indices);
680
681 let k1 = serialize_pk_with_vnode(pk, pk_serde, VirtualNode::from_index(vnode));
682 let v1 = SharedBufferValue::Insert(Bytes::copy_from_slice(
683 format!("{}-value-{}", vnode, col_2).as_bytes(),
684 ));
685 (k1, v1)
686 }
687
688 let watermark_direction = WatermarkDirection::Ascending;
689
690 let watermark_col_serde =
691 OrderedRowSerde::new(vec![DataType::Int32], vec![OrderType::ascending()]);
692 let pk_serde = OrderedRowSerde::new(
693 vec![DataType::Int32, DataType::Int32, DataType::Int32],
694 vec![
695 OrderType::ascending(),
696 OrderType::ascending(),
697 OrderType::ascending(),
698 ],
699 );
700
701 let pk_indices = vec![0, 2, 3];
702 let watermark_col_idx_in_pk = 1;
703
704 {
705 let shared_buffer_batch = {
707 let mut kv_pairs = (0..10)
708 .map(|i| gen_key_value(0, i, 0, i, i, &pk_serde, &pk_indices))
709 .collect_vec();
710 kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
711 build_batch(kv_pairs.into_iter(), TABLE_ID)
712 }
713 .unwrap();
714
715 {
716 let read_watermark = ReadTableWatermark {
718 direction: watermark_direction,
719 vnode_watermarks: BTreeMap::default(),
720 };
721
722 let compaction_catalog_agent_ref = CompactionCatalogAgent::for_test(vec![TABLE_ID]);
723
724 let mut iter = NonPkPrefixSkipWatermarkIterator::new(
725 shared_buffer_batch.clone().into_forward_iter(),
726 NonPkPrefixSkipWatermarkState::new(
727 BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
728 compaction_catalog_agent_ref,
729 ),
730 );
731
732 iter.rewind().await.unwrap();
733 assert!(iter.is_valid());
734 for i in 0..10 {
735 let (k, _v) = gen_key_value(0, i, 0, i, i, &pk_serde, &pk_indices);
736 assert_eq!(iter.key().user_key.table_key.as_ref(), k.as_ref());
737 iter.next().await.unwrap();
738 }
739 assert!(!iter.is_valid());
740 }
741
742 {
743 let watermark = {
745 let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
746 serialize_pk(r1, &watermark_col_serde)
747 };
748
749 let read_watermark = ReadTableWatermark {
750 direction: watermark_direction,
751 vnode_watermarks: BTreeMap::from_iter(once((
752 VirtualNode::from_index(0),
753 watermark.clone(),
754 ))),
755 };
756
757 let full_key_filter_key_extractor =
758 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
759
760 let table_id_to_vnode =
761 HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
762
763 let table_id_to_watermark_serde = HashMap::from_iter(once((
764 TABLE_ID,
765 Some((
766 pk_serde.clone(),
767 watermark_col_serde.clone(),
768 watermark_col_idx_in_pk,
769 )),
770 )));
771
772 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
773 full_key_filter_key_extractor,
774 table_id_to_vnode,
775 table_id_to_watermark_serde,
776 ));
777
778 let mut iter = NonPkPrefixSkipWatermarkIterator::new(
779 shared_buffer_batch.clone().into_forward_iter(),
780 NonPkPrefixSkipWatermarkState::new(
781 BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
782 compaction_catalog_agent_ref,
783 ),
784 );
785
786 iter.rewind().await.unwrap();
787 assert!(iter.is_valid());
788 for i in 5..10 {
789 let (k, _v) = gen_key_value(0, i, 0, i, i, &pk_serde, &pk_indices);
790 assert_eq!(iter.key().user_key.table_key.as_ref(), k.as_ref());
791 iter.next().await.unwrap();
792 }
793 assert!(!iter.is_valid());
794 }
795 }
796
797 {
798 let shared_buffer_batch = {
800 let mut kv_pairs = (0..10_i32)
801 .map(|i| gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices))
802 .collect_vec();
803
804 kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
805 build_batch(kv_pairs.into_iter(), TABLE_ID)
806 };
807
808 {
809 let watermark = {
811 let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
812 serialize_pk(r1, &watermark_col_serde)
813 };
814
815 let read_watermark = ReadTableWatermark {
816 direction: watermark_direction,
817 vnode_watermarks: BTreeMap::from_iter(
818 (0..2).map(|i| (VirtualNode::from_index(i), watermark.clone())),
819 ),
820 };
821
822 let full_key_filter_key_extractor =
823 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
824
825 let table_id_to_vnode =
826 HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
827
828 let table_id_to_watermark_serde = HashMap::from_iter(once((
829 TABLE_ID,
830 Some((
831 pk_serde.clone(),
832 watermark_col_serde.clone(),
833 watermark_col_idx_in_pk,
834 )),
835 )));
836
837 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
838 full_key_filter_key_extractor,
839 table_id_to_vnode,
840 table_id_to_watermark_serde,
841 ));
842
843 let mut iter = NonPkPrefixSkipWatermarkIterator::new(
844 shared_buffer_batch.clone().unwrap().into_forward_iter(),
845 NonPkPrefixSkipWatermarkState::new(
846 BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
847 compaction_catalog_agent_ref,
848 ),
849 );
850
851 iter.rewind().await.unwrap();
852 assert!(iter.is_valid());
853 let mut kv_pairs = (5..10_i32)
854 .map(|i| {
855 let (k, v) =
856 gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices);
857 (k, v)
858 })
859 .collect_vec();
860 kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
861 let mut index = 0;
862 while iter.is_valid() {
863 assert!(kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
864 iter.next().await.unwrap();
865 index += 1;
866 }
867 }
868
869 {
870 let watermark = {
872 let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
873 serialize_pk(r1, &watermark_col_serde)
874 };
875
876 let read_watermark = ReadTableWatermark {
877 direction: watermark_direction,
878 vnode_watermarks: BTreeMap::from_iter(
879 (0..2).map(|i| (VirtualNode::from_index(i), watermark.clone())),
880 ),
881 };
882
883 let full_key_filter_key_extractor =
884 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
885
886 let table_id_to_vnode =
887 HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
888
889 let table_id_to_watermark_serde = HashMap::from_iter(once((
890 TABLE_ID,
891 Some((
892 pk_serde.clone(),
893 watermark_col_serde.clone(),
894 watermark_col_idx_in_pk,
895 )),
896 )));
897
898 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
899 full_key_filter_key_extractor,
900 table_id_to_vnode,
901 table_id_to_watermark_serde,
902 ));
903
904 let mut iter = NonPkPrefixSkipWatermarkIterator::new(
905 shared_buffer_batch.clone().unwrap().into_forward_iter(),
906 NonPkPrefixSkipWatermarkState::new(
907 BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
908 compaction_catalog_agent_ref,
909 ),
910 );
911
912 iter.rewind().await.unwrap();
913 assert!(iter.is_valid());
914 let mut kv_pairs = (5..10_i32)
915 .map(|i| {
916 let (k, v) =
917 gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices);
918 (k, v)
919 })
920 .collect_vec();
921 kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
922 let mut index = 0;
923 while iter.is_valid() {
924 assert!(kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
925 iter.next().await.unwrap();
926 index += 1;
927 }
928 }
929
930 {
931 let watermark = {
933 let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
934 serialize_pk(r1, &watermark_col_serde)
935 };
936
937 let read_watermark = ReadTableWatermark {
938 direction: WatermarkDirection::Descending,
939 vnode_watermarks: BTreeMap::from_iter(
940 (0..2).map(|i| (VirtualNode::from_index(i), watermark.clone())),
941 ),
942 };
943
944 let full_key_filter_key_extractor =
945 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
946
947 let table_id_to_vnode =
948 HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
949
950 let table_id_to_watermark_serde = HashMap::from_iter(once((
951 TABLE_ID,
952 Some((
953 pk_serde.clone(),
954 watermark_col_serde.clone(),
955 watermark_col_idx_in_pk,
956 )),
957 )));
958
959 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
960 full_key_filter_key_extractor,
961 table_id_to_vnode,
962 table_id_to_watermark_serde,
963 ));
964
965 let mut iter = NonPkPrefixSkipWatermarkIterator::new(
966 shared_buffer_batch.clone().unwrap().into_forward_iter(),
967 NonPkPrefixSkipWatermarkState::new(
968 BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
969 compaction_catalog_agent_ref,
970 ),
971 );
972
973 iter.rewind().await.unwrap();
974 assert!(iter.is_valid());
975 let mut kv_pairs = (0..=5_i32)
976 .map(|i| {
977 let (k, v) =
978 gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices);
979 (k, v)
980 })
981 .collect_vec();
982 kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
983 let mut index = 0;
984 while iter.is_valid() {
985 assert!(kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
986 iter.next().await.unwrap();
987 index += 1;
988 }
989 }
990 }
991 }
992
993 #[tokio::test]
994 async fn test_mix_watermark() {
995 fn gen_key_value(
996 vnode: usize,
997 col_0: i32,
998 col_1: i32,
999 col_2: i32,
1000 col_3: i32,
1001 pk_serde: &OrderedRowSerde,
1002 pk_indices: &[usize],
1003 ) -> (TableKey<Bytes>, SharedBufferValue<Bytes>) {
1004 let r = OwnedRow::new(vec![
1005 Some(ScalarImpl::Int32(col_0)),
1006 Some(ScalarImpl::Int32(col_1)),
1007 Some(ScalarImpl::Int32(col_2)), Some(ScalarImpl::Int32(col_3)),
1009 ]);
1010
1011 let pk = r.project(pk_indices);
1012
1013 let k1 = serialize_pk_with_vnode(pk, pk_serde, VirtualNode::from_index(vnode));
1014 let v1 = SharedBufferValue::Insert(Bytes::copy_from_slice(
1015 format!("{}-value-{}-{}-{}-{}", vnode, col_0, col_1, col_2, col_3).as_bytes(),
1016 ));
1017
1018 (k1, v1)
1019 }
1020
1021 let watermark_col_serde =
1022 OrderedRowSerde::new(vec![DataType::Int32], vec![OrderType::ascending()]);
1023 let t1_pk_serde = OrderedRowSerde::new(
1024 vec![DataType::Int32, DataType::Int32, DataType::Int32],
1025 vec![
1026 OrderType::ascending(),
1027 OrderType::ascending(),
1028 OrderType::ascending(),
1029 ],
1030 );
1031
1032 let t1_pk_indices = vec![0, 2, 3];
1033 let t1_watermark_col_idx_in_pk = 1;
1034
1035 let t2_pk_indices = vec![0, 1, 2];
1036
1037 let t2_pk_serde = OrderedRowSerde::new(
1038 vec![DataType::Int32, DataType::Int32, DataType::Int32],
1039 vec![
1040 OrderType::ascending(),
1041 OrderType::ascending(),
1042 OrderType::ascending(),
1043 ],
1044 );
1045
1046 let t1_id = TABLE_ID;
1047 let t2_id = TableId::from(t1_id.as_raw_id() + 1);
1048
1049 let t1_shared_buffer_batch = {
1050 let mut kv_pairs = (0..10_i32)
1051 .map(|i| {
1052 gen_key_value(
1053 i as usize % 2,
1054 10 - i,
1055 0,
1056 i,
1057 i,
1058 &t1_pk_serde,
1059 &t1_pk_indices,
1060 )
1061 })
1062 .collect_vec();
1063
1064 kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1065 build_batch(kv_pairs.into_iter(), t1_id).unwrap()
1066 };
1067
1068 let t2_shared_buffer_batch = {
1069 let mut kv_pairs = (0..10_i32)
1070 .map(|i| {
1071 gen_key_value(
1072 i as usize % 2,
1073 10 - i,
1074 0,
1075 0,
1076 0,
1077 &t2_pk_serde,
1078 &t2_pk_indices,
1079 )
1080 })
1081 .collect_vec();
1082
1083 kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1084 build_batch(kv_pairs.into_iter(), t2_id).unwrap()
1085 };
1086
1087 let t1_watermark = {
1088 let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
1089 serialize_pk(r1, &watermark_col_serde)
1090 };
1091
1092 let t1_read_watermark = ReadTableWatermark {
1093 direction: WatermarkDirection::Ascending,
1094 vnode_watermarks: BTreeMap::from_iter(
1095 (0..2).map(|i| (VirtualNode::from_index(i), t1_watermark.clone())),
1096 ),
1097 };
1098
1099 let t2_watermark = {
1100 let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
1101 serialize_pk(r1, &watermark_col_serde)
1102 };
1103
1104 let t2_read_watermark = ReadTableWatermark {
1105 direction: WatermarkDirection::Ascending,
1106 vnode_watermarks: BTreeMap::from_iter(
1107 (0..2).map(|i| (VirtualNode::from_index(i), t2_watermark.clone())),
1108 ),
1109 };
1110
1111 {
1112 let t1_iter = t1_shared_buffer_batch.clone().into_forward_iter();
1114 let t2_iter = t2_shared_buffer_batch.clone().into_forward_iter();
1115 let iter_vec = vec![t1_iter, t2_iter];
1116 let merge_iter = MergeIterator::new(iter_vec);
1117
1118 let full_key_filter_key_extractor =
1119 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
1120
1121 let table_id_to_vnode =
1122 HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
1123
1124 let table_id_to_watermark_serde = HashMap::from_iter(once((
1125 t1_id,
1126 Some((
1127 t1_pk_serde.clone(),
1128 watermark_col_serde.clone(),
1129 t1_watermark_col_idx_in_pk,
1130 )),
1131 )));
1132
1133 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1134 full_key_filter_key_extractor,
1135 table_id_to_vnode,
1136 table_id_to_watermark_serde,
1137 ));
1138
1139 let mut iter = NonPkPrefixSkipWatermarkIterator::new(
1140 merge_iter,
1141 NonPkPrefixSkipWatermarkState::new(
1142 BTreeMap::from_iter(once((TABLE_ID, t1_read_watermark.clone()))),
1143 compaction_catalog_agent_ref,
1144 ),
1145 );
1146
1147 iter.rewind().await.unwrap();
1148 assert!(iter.is_valid());
1149 let mut t1_kv_pairs = (5..10_i32)
1150 .map(|i| {
1151 let (k, v) = gen_key_value(
1152 i as usize % 2,
1153 10 - i,
1154 0,
1155 i,
1156 i,
1157 &t1_pk_serde,
1158 &t1_pk_indices,
1159 );
1160 (k, v)
1161 })
1162 .collect_vec();
1163
1164 t1_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1165
1166 let mut t2_kv_pairs = (0..10_i32)
1167 .map(|i| {
1168 gen_key_value(
1169 i as usize % 2,
1170 10 - i,
1171 0,
1172 0,
1173 0,
1174 &t2_pk_serde,
1175 &t2_pk_indices,
1176 )
1177 })
1178 .collect_vec();
1179
1180 t2_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1181 let mut index = 0;
1182 for _ in 0..t1_kv_pairs.len() {
1183 assert!(t1_kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
1184 iter.next().await.unwrap();
1185 index += 1;
1186 }
1187
1188 assert!(iter.is_valid());
1189 assert_eq!(t1_kv_pairs.len(), index);
1190
1191 index = 0;
1192 for _ in 0..t2_kv_pairs.len() {
1193 assert!(t2_kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
1194 iter.next().await.unwrap();
1195 index += 1;
1196 }
1197
1198 assert!(!iter.is_valid());
1199 assert_eq!(t2_kv_pairs.len(), index);
1200 }
1201
1202 {
1203 let t1_iter = t1_shared_buffer_batch.clone().into_forward_iter();
1204 let t2_iter = t2_shared_buffer_batch.clone().into_forward_iter();
1205 let iter_vec = vec![t1_iter, t2_iter];
1206 let merge_iter = MergeIterator::new(iter_vec);
1207
1208 let full_key_filter_key_extractor =
1209 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
1210
1211 let table_id_to_vnode = HashMap::from_iter(
1212 vec![
1213 (t1_id, VirtualNode::COUNT_FOR_TEST),
1214 (t2_id, VirtualNode::COUNT_FOR_TEST),
1215 ]
1216 .into_iter(),
1217 );
1218
1219 let table_id_to_watermark_serde = HashMap::from_iter(
1220 vec![
1221 (
1222 t1_id,
1223 Some((
1224 t1_pk_serde.clone(),
1225 watermark_col_serde.clone(),
1226 t1_watermark_col_idx_in_pk,
1227 )),
1228 ),
1229 (t2_id, None),
1230 ]
1231 .into_iter(),
1232 );
1233
1234 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1235 full_key_filter_key_extractor,
1236 table_id_to_vnode,
1237 table_id_to_watermark_serde,
1238 ));
1239
1240 let non_pk_prefix_iter = NonPkPrefixSkipWatermarkIterator::new(
1241 merge_iter,
1242 NonPkPrefixSkipWatermarkState::new(
1243 BTreeMap::from_iter(once((t1_id, t1_read_watermark.clone()))),
1244 compaction_catalog_agent_ref.clone(),
1245 ),
1246 );
1247
1248 let mut mix_iter = PkPrefixSkipWatermarkIterator::new(
1249 non_pk_prefix_iter,
1250 PkPrefixSkipWatermarkState::new(BTreeMap::from_iter(once((
1251 t2_id,
1252 t2_read_watermark.clone(),
1253 )))),
1254 );
1255
1256 mix_iter.rewind().await.unwrap();
1257 assert!(mix_iter.is_valid());
1258
1259 let mut t1_kv_pairs = (5..10_i32)
1260 .map(|i| {
1261 let (k, v) = gen_key_value(
1262 i as usize % 2,
1263 10 - i,
1264 0,
1265 i,
1266 i,
1267 &t1_pk_serde,
1268 &t1_pk_indices,
1269 );
1270 (k, v)
1271 })
1272 .collect_vec();
1273
1274 t1_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1275
1276 let mut t2_kv_pairs = (0..=5_i32)
1277 .map(|i| {
1278 gen_key_value(
1279 i as usize % 2,
1280 10 - i,
1281 0,
1282 0,
1283 0,
1284 &t2_pk_serde,
1285 &t2_pk_indices,
1286 )
1287 })
1288 .collect_vec();
1289
1290 t2_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1291
1292 let mut index = 0;
1293 for _ in 0..t1_kv_pairs.len() {
1294 assert!(
1295 t1_kv_pairs[index].0.as_ref() == mix_iter.key().user_key.table_key.as_ref()
1296 );
1297 mix_iter.next().await.unwrap();
1298 index += 1;
1299 }
1300
1301 assert!(mix_iter.is_valid());
1302 assert_eq!(t1_kv_pairs.len(), index);
1303
1304 index = 0;
1305
1306 for _ in 0..t2_kv_pairs.len() {
1307 assert!(
1308 t2_kv_pairs[index].0.as_ref() == mix_iter.key().user_key.table_key.as_ref()
1309 );
1310 mix_iter.next().await.unwrap();
1311 index += 1;
1312 }
1313
1314 assert!(!mix_iter.is_valid());
1315 assert_eq!(t2_kv_pairs.len(), index);
1316 }
1317 }
1318}