1use std::cmp::Ordering;
16use std::collections::{BTreeMap, VecDeque};
17
18use bytes::Bytes;
19use risingwave_common::catalog::TableId;
20use risingwave_common::hash::VirtualNode;
21use risingwave_common::row::Row;
22use risingwave_common::types::Datum;
23use risingwave_common::util::row_serde::OrderedRowSerde;
24use risingwave_hummock_sdk::compaction_group::hummock_version_ext::safe_epoch_read_table_watermarks_impl;
25use risingwave_hummock_sdk::key::FullKey;
26use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap, add_table_stats_map};
27use risingwave_hummock_sdk::table_watermark::{
28 ReadTableWatermark, TableWatermarks, WatermarkDirection,
29};
30
31use super::SkipWatermarkState;
32use crate::compaction_catalog_manager::CompactionCatalogAgentRef;
33use crate::hummock::HummockResult;
34use crate::hummock::iterator::{Forward, HummockIterator, ValueMeta};
35use crate::hummock::value::HummockValue;
36use crate::monitor::StoreLocalStatistic;
37
38pub struct SkipWatermarkIterator<I, S> {
39 inner: I,
40 state: S,
41 skipped_entry_table_stats: TableStatsMap,
43 last_table_id: Option<TableId>,
45 last_table_stats: TableStats,
47}
48
49impl<I: HummockIterator<Direction = Forward>, S: SkipWatermarkState> SkipWatermarkIterator<I, S> {
50 pub fn new(inner: I, state: S) -> Self {
51 Self {
52 inner,
53 state,
54 skipped_entry_table_stats: TableStatsMap::default(),
55 last_table_id: None,
56 last_table_stats: TableStats::default(),
57 }
58 }
59
60 fn reset_watermark(&mut self) {
61 self.state.reset_watermark();
62 }
63
64 fn reset_skipped_entry_table_stats(&mut self) {
65 self.skipped_entry_table_stats = TableStatsMap::default();
66 self.last_table_id = None;
67 self.last_table_stats = TableStats::default();
68 }
69
70 async fn advance_key_and_watermark(&mut self) -> HummockResult<()> {
75 while self.inner.is_valid() {
78 if !self.state.should_delete(&self.inner.key()) {
79 break;
80 }
81
82 if self
83 .last_table_id
84 .is_none_or(|last_table_id| last_table_id != self.inner.key().user_key.table_id)
85 {
86 self.add_last_table_stats();
87 self.last_table_id = Some(self.inner.key().user_key.table_id);
88 }
89 self.last_table_stats.total_key_count -= 1;
90 self.last_table_stats.total_key_size -= self.inner.key().encoded_len() as i64;
91 self.last_table_stats.total_value_size -= self.inner.value().encoded_len() as i64;
92
93 self.inner.next().await?;
94 }
95 self.add_last_table_stats();
96 Ok(())
97 }
98
99 fn add_last_table_stats(&mut self) {
100 let Some(last_table_id) = self.last_table_id.take() else {
101 return;
102 };
103 let delta = std::mem::take(&mut self.last_table_stats);
104 let e = self
105 .skipped_entry_table_stats
106 .entry(last_table_id)
107 .or_default();
108 e.total_key_count += delta.total_key_count;
109 e.total_key_size += delta.total_key_size;
110 e.total_value_size += delta.total_value_size;
111 }
112}
113
114impl<I: HummockIterator<Direction = Forward>, S: SkipWatermarkState> HummockIterator
115 for SkipWatermarkIterator<I, S>
116{
117 type Direction = Forward;
118
119 async fn next(&mut self) -> HummockResult<()> {
120 self.inner.next().await?;
121 if self.state.has_watermark() {
125 self.advance_key_and_watermark().await?;
126 }
127 Ok(())
128 }
129
130 fn key(&self) -> FullKey<&[u8]> {
131 self.inner.key()
132 }
133
134 fn value(&self) -> HummockValue<&[u8]> {
135 self.inner.value()
136 }
137
138 fn is_valid(&self) -> bool {
139 self.inner.is_valid()
140 }
141
142 async fn rewind(&mut self) -> HummockResult<()> {
143 self.reset_watermark();
144 self.reset_skipped_entry_table_stats();
145 self.inner.rewind().await?;
146 self.advance_key_and_watermark().await?;
147 Ok(())
148 }
149
150 async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
151 self.reset_watermark();
152 self.reset_skipped_entry_table_stats();
153 self.inner.seek(key).await?;
154 self.advance_key_and_watermark().await?;
155 Ok(())
156 }
157
158 fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
159 add_table_stats_map(
160 &mut stats.skipped_by_watermark_table_stats,
161 &self.skipped_entry_table_stats,
162 );
163 self.inner.collect_local_statistic(stats)
164 }
165
166 fn value_meta(&self) -> ValueMeta {
167 self.inner.value_meta()
168 }
169}
170pub struct PkPrefixSkipWatermarkState {
171 watermarks: BTreeMap<TableId, ReadTableWatermark>,
172 remain_watermarks: VecDeque<(TableId, VirtualNode, WatermarkDirection, Bytes)>,
173}
174
175impl SkipWatermarkState for PkPrefixSkipWatermarkState {
176 #[inline(always)]
177 fn has_watermark(&self) -> bool {
178 !self.remain_watermarks.is_empty()
179 }
180
181 fn should_delete(&mut self, key: &FullKey<&[u8]>) -> bool {
182 if let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
183 let key_table_id = key.user_key.table_id;
184 let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
185 match (&key_table_id, &key_vnode).cmp(&(table_id, vnode)) {
186 Ordering::Less => {
187 return false;
188 }
189 Ordering::Equal => {
190 return direction.key_filter_by_watermark(inner_key, watermark);
191 }
192 Ordering::Greater => {
193 return self.advance_watermark(key);
196 }
197 }
198 }
199 false
200 }
201
202 fn reset_watermark(&mut self) {
203 self.remain_watermarks = self
204 .watermarks
205 .iter()
206 .flat_map(|(table_id, read_watermarks)| {
207 read_watermarks
208 .vnode_watermarks
209 .iter()
210 .map(|(vnode, watermarks)| {
211 (
212 *table_id,
213 *vnode,
214 read_watermarks.direction,
215 watermarks.clone(),
216 )
217 })
218 })
219 .collect();
220 }
221
222 fn advance_watermark(&mut self, key: &FullKey<&[u8]>) -> bool {
227 let key_table_id = key.user_key.table_id;
228 let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
229 while let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
230 match (table_id, vnode).cmp(&(&key_table_id, &key_vnode)) {
231 Ordering::Less => {
232 self.remain_watermarks.pop_front();
233 continue;
234 }
235 Ordering::Equal => {
236 match direction {
237 WatermarkDirection::Ascending => {
238 match inner_key.cmp(watermark.as_ref()) {
239 Ordering::Less => {
240 return true;
243 }
244 Ordering::Equal | Ordering::Greater => {
245 self.remain_watermarks.pop_front();
248 #[cfg(debug_assertions)]
253 {
254 if let Some((next_table_id, next_vnode, _, _)) =
255 self.remain_watermarks.front()
256 {
257 assert!(
258 (next_table_id, next_vnode)
259 > (&key_table_id, &key_vnode)
260 );
261 }
262 }
263 return false;
264 }
265 }
266 }
267 WatermarkDirection::Descending => {
268 return match inner_key.cmp(watermark.as_ref()) {
269 Ordering::Less | Ordering::Equal => false,
271 Ordering::Greater => true,
274 };
275 }
276 }
277 }
278 Ordering::Greater => {
279 return false;
280 }
281 }
282 }
283 false
284 }
285}
286
287impl PkPrefixSkipWatermarkState {
288 pub fn new(watermarks: BTreeMap<TableId, ReadTableWatermark>) -> Self {
289 Self {
290 remain_watermarks: VecDeque::new(),
291 watermarks,
292 }
293 }
294
295 pub fn from_safe_epoch_watermarks(
296 safe_epoch_watermarks: BTreeMap<TableId, TableWatermarks>,
297 ) -> Self {
298 let watermarks = safe_epoch_read_table_watermarks_impl(safe_epoch_watermarks);
299 Self::new(watermarks)
300 }
301}
302
303pub struct NonPkPrefixSkipWatermarkState {
304 watermarks: BTreeMap<TableId, ReadTableWatermark>,
305 remain_watermarks: VecDeque<(TableId, VirtualNode, WatermarkDirection, Datum)>,
306 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
307
308 last_serde: Option<(OrderedRowSerde, OrderedRowSerde, usize)>,
309 last_table_id: Option<TableId>,
310}
311
312impl NonPkPrefixSkipWatermarkState {
313 pub fn new(
314 watermarks: BTreeMap<TableId, ReadTableWatermark>,
315 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
316 ) -> Self {
317 Self {
318 remain_watermarks: VecDeque::new(),
319 watermarks,
320 compaction_catalog_agent_ref,
321 last_serde: None,
322 last_table_id: None,
323 }
324 }
325
326 pub fn from_safe_epoch_watermarks(
327 safe_epoch_watermarks: BTreeMap<TableId, TableWatermarks>,
328 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
329 ) -> Self {
330 let watermarks = safe_epoch_read_table_watermarks_impl(safe_epoch_watermarks);
331 Self::new(watermarks, compaction_catalog_agent_ref)
332 }
333}
334
335impl SkipWatermarkState for NonPkPrefixSkipWatermarkState {
336 #[inline(always)]
337 fn has_watermark(&self) -> bool {
338 !self.remain_watermarks.is_empty()
339 }
340
341 fn should_delete(&mut self, key: &FullKey<&[u8]>) -> bool {
342 if let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
343 let key_table_id = key.user_key.table_id;
344 {
345 if self
346 .last_table_id
347 .is_none_or(|last_table_id| last_table_id != key_table_id)
348 {
349 self.last_table_id = Some(key_table_id);
350 self.last_serde = self.compaction_catalog_agent_ref.watermark_serde(*table_id);
351 }
352 }
353
354 let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
355 match (&key_table_id, &key_vnode).cmp(&(table_id, vnode)) {
356 Ordering::Less => {
357 return false;
358 }
359 Ordering::Equal => {
360 let (pk_prefix_serde, watermark_col_serde, watermark_col_idx_in_pk) =
361 self.last_serde.as_ref().unwrap();
362 let row = pk_prefix_serde
363 .deserialize(inner_key)
364 .unwrap_or_else(|_| {
365 panic!("Failed to deserialize pk_prefix inner_key {:?} serde data_types {:?} order_types {:?}", inner_key, pk_prefix_serde.get_data_types(), pk_prefix_serde.get_order_types());
366 });
367 let watermark_col_in_pk = row.datum_at(*watermark_col_idx_in_pk);
368 return direction.datum_filter_by_watermark(
369 watermark_col_in_pk,
370 watermark,
371 watermark_col_serde.get_order_types()[0],
372 );
373 }
374 Ordering::Greater => {
375 return self.advance_watermark(key);
378 }
379 }
380 }
381 false
382 }
383
384 fn reset_watermark(&mut self) {
385 self.remain_watermarks = self
386 .watermarks
387 .iter()
388 .flat_map(|(table_id, read_watermarks)| {
389 let watermark_serde = self.compaction_catalog_agent_ref.watermark_serde(*table_id).map(|(_pk_serde, watermark_serde, _watermark_col_idx_in_pk)| watermark_serde);
390
391 read_watermarks
392 .vnode_watermarks
393 .iter()
394 .flat_map(move |(vnode, watermarks)| {
395 let watermark_serde = watermark_serde.as_ref()?;
397 Some((
398 *table_id,
399 *vnode,
400 read_watermarks.direction,
401 {
402 let row = watermark_serde
403 .deserialize(watermarks).unwrap_or_else(|_| {
404 panic!("Failed to deserialize watermark {:?} serde data_types {:?} order_types {:?}", watermarks, watermark_serde.get_data_types(), watermark_serde.get_order_types());
405 });
406 row[0].clone()
407 },
408 ))
409 })
410 })
411 .collect();
412 }
413
414 fn advance_watermark(&mut self, key: &FullKey<&[u8]>) -> bool {
415 let key_table_id = key.user_key.table_id;
416 let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
417 while let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
418 match (table_id, vnode).cmp(&(&key_table_id, &key_vnode)) {
419 Ordering::Less => {
420 self.remain_watermarks.pop_front();
421 continue;
422 }
423 Ordering::Equal => {
424 let (pk_prefix_serde, watermark_col_serde, watermark_col_idx_in_pk) =
425 self.last_serde.as_ref().unwrap();
426
427 let row = pk_prefix_serde
428 .deserialize(inner_key)
429 .unwrap_or_else(|_| {
430 panic!("Failed to deserialize pk_prefix inner_key {:?} serde data_types {:?} order_types {:?}", inner_key, pk_prefix_serde.get_data_types(), pk_prefix_serde.get_order_types());
431 });
432 let watermark_col_in_pk = row.datum_at(*watermark_col_idx_in_pk);
433
434 return direction.datum_filter_by_watermark(
435 watermark_col_in_pk,
436 watermark,
437 watermark_col_serde.get_order_types()[0],
438 );
439 }
440 Ordering::Greater => {
441 return false;
442 }
443 }
444 }
445 false
446 }
447}
448
449pub type PkPrefixSkipWatermarkIterator<I> = SkipWatermarkIterator<I, PkPrefixSkipWatermarkState>;
450
451pub type NonPkPrefixSkipWatermarkIterator<I> =
452 SkipWatermarkIterator<I, NonPkPrefixSkipWatermarkState>;
453
454#[cfg(test)]
455mod tests {
456 use std::collections::{BTreeMap, HashMap};
457 use std::iter::{empty, once};
458 use std::sync::Arc;
459
460 use bytes::Bytes;
461 use itertools::Itertools;
462 use risingwave_common::catalog::TableId;
463 use risingwave_common::hash::VirtualNode;
464 use risingwave_common::row::{OwnedRow, RowExt};
465 use risingwave_common::types::{DataType, ScalarImpl};
466 use risingwave_common::util::epoch::test_epoch;
467 use risingwave_common::util::row_serde::OrderedRowSerde;
468 use risingwave_common::util::sort_util::OrderType;
469 use risingwave_hummock_sdk::EpochWithGap;
470 use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey, gen_key_from_str};
471 use risingwave_hummock_sdk::table_watermark::{ReadTableWatermark, WatermarkDirection};
472
473 use super::PkPrefixSkipWatermarkState;
474 use crate::compaction_catalog_manager::{
475 CompactionCatalogAgent, FilterKeyExtractorImpl, FullKeyFilterKeyExtractor,
476 };
477 use crate::hummock::iterator::{
478 HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
479 NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator,
480 };
481 use crate::hummock::shared_buffer::shared_buffer_batch::{
482 SharedBufferBatch, SharedBufferValue,
483 };
484 use crate::row_serde::row_serde_util::{serialize_pk, serialize_pk_with_vnode};
485
486 const EPOCH: u64 = test_epoch(1);
487 const TABLE_ID: TableId = TableId::new(233);
488
489 async fn assert_iter_eq(
490 mut first: Option<impl HummockIterator>,
491 mut second: impl HummockIterator,
492 seek_key: Option<(usize, usize)>,
493 ) {
494 if let Some((vnode, key_index)) = seek_key {
495 let (seek_key, _) = gen_key_value(vnode, key_index);
496 let full_key = FullKey {
497 user_key: UserKey {
498 table_id: TABLE_ID,
499 table_key: seek_key,
500 },
501 epoch_with_gap: EpochWithGap::new_from_epoch(EPOCH),
502 };
503 if let Some(first) = &mut first {
504 first.seek(full_key.to_ref()).await.unwrap();
505 }
506 second.seek(full_key.to_ref()).await.unwrap()
507 } else {
508 if let Some(first) = &mut first {
509 first.rewind().await.unwrap();
510 }
511 second.rewind().await.unwrap();
512 }
513
514 if let Some(first) = &mut first {
515 while first.is_valid() {
516 assert!(second.is_valid());
517 let first_key = first.key();
518 let second_key = second.key();
519 assert_eq!(first_key, second_key);
520 assert_eq!(first.value(), second.value());
521 first.next().await.unwrap();
522 second.next().await.unwrap();
523 }
524 }
525 assert!(!second.is_valid());
526 }
527
528 fn build_batch(
529 pairs: impl Iterator<Item = (TableKey<Bytes>, SharedBufferValue<Bytes>)>,
530 table_id: TableId,
531 ) -> Option<SharedBufferBatch> {
532 let pairs: Vec<_> = pairs.collect();
533 if pairs.is_empty() {
534 None
535 } else {
536 Some(SharedBufferBatch::for_test(pairs, EPOCH, table_id))
537 }
538 }
539
540 fn filter_with_watermarks(
541 iter: impl Iterator<Item = (TableKey<Bytes>, SharedBufferValue<Bytes>)>,
542 table_watermarks: ReadTableWatermark,
543 ) -> impl Iterator<Item = (TableKey<Bytes>, SharedBufferValue<Bytes>)> {
544 iter.filter(move |(key, _)| {
545 if let Some(watermark) = table_watermarks.vnode_watermarks.get(&key.vnode_part()) {
546 !table_watermarks
547 .direction
548 .key_filter_by_watermark(key.key_part(), watermark)
549 } else {
550 true
551 }
552 })
553 }
554
555 fn gen_inner_key(index: usize) -> String {
556 format!("key{:5}", index)
557 }
558
559 async fn test_watermark(
560 watermarks: impl IntoIterator<Item = (usize, usize)>,
561 direction: WatermarkDirection,
562 ) {
563 let test_index: [(usize, usize); 7] =
564 [(0, 2), (0, 3), (0, 4), (1, 1), (1, 3), (4, 2), (8, 1)];
565 let items = test_index
566 .iter()
567 .map(|(vnode, key_index)| gen_key_value(*vnode, *key_index))
568 .collect_vec();
569
570 let read_watermark = ReadTableWatermark {
571 direction,
572 vnode_watermarks: BTreeMap::from_iter(watermarks.into_iter().map(
573 |(vnode, key_index)| {
574 (
575 VirtualNode::from_index(vnode),
576 Bytes::from(gen_inner_key(key_index)),
577 )
578 },
579 )),
580 };
581
582 let gen_iters = || {
583 let batch = build_batch(
584 filter_with_watermarks(items.clone().into_iter(), read_watermark.clone()),
585 TABLE_ID,
586 );
587
588 let iter = PkPrefixSkipWatermarkIterator::new(
589 build_batch(items.clone().into_iter(), TABLE_ID)
590 .unwrap()
591 .into_forward_iter(),
592 PkPrefixSkipWatermarkState::new(BTreeMap::from_iter(once((
593 TABLE_ID,
594 read_watermark.clone(),
595 )))),
596 );
597 (batch.map(|batch| batch.into_forward_iter()), iter)
598 };
599 let (first, second) = gen_iters();
600 assert_iter_eq(first, second, None).await;
601 for (vnode, key_index) in &test_index {
602 let (first, second) = gen_iters();
603 assert_iter_eq(first, second, Some((*vnode, *key_index))).await;
604 }
605 let (last_vnode, last_key_index) = test_index.last().unwrap();
606 let (first, second) = gen_iters();
607 assert_iter_eq(first, second, Some((*last_vnode, last_key_index + 1))).await;
608 }
609
610 fn gen_key_value(vnode: usize, index: usize) -> (TableKey<Bytes>, SharedBufferValue<Bytes>) {
611 (
612 gen_key_from_str(VirtualNode::from_index(vnode), &gen_inner_key(index)),
613 SharedBufferValue::Insert(Bytes::copy_from_slice(
614 format!("{}-value-{}", vnode, index).as_bytes(),
615 )),
616 )
617 }
618
619 #[tokio::test]
620 async fn test_no_watermark() {
621 test_watermark(empty(), WatermarkDirection::Ascending).await;
622 test_watermark(empty(), WatermarkDirection::Descending).await;
623 }
624
625 #[tokio::test]
626 async fn test_too_low_watermark() {
627 test_watermark(vec![(0, 0)], WatermarkDirection::Ascending).await;
628 test_watermark(vec![(0, 10)], WatermarkDirection::Descending).await;
629 }
630
631 #[tokio::test]
632 async fn test_single_watermark() {
633 test_watermark(vec![(0, 3)], WatermarkDirection::Ascending).await;
634 test_watermark(vec![(0, 3)], WatermarkDirection::Descending).await;
635 }
636
637 #[tokio::test]
638 async fn test_watermark_vnode_no_data() {
639 test_watermark(vec![(3, 3)], WatermarkDirection::Ascending).await;
640 test_watermark(vec![(3, 3)], WatermarkDirection::Descending).await;
641 }
642
643 #[tokio::test]
644 async fn test_filter_all() {
645 test_watermark(
646 vec![(0, 5), (1, 4), (2, 0), (4, 3), (8, 2)],
647 WatermarkDirection::Ascending,
648 )
649 .await;
650 test_watermark(
651 vec![(0, 0), (1, 0), (2, 0), (4, 0), (8, 0)],
652 WatermarkDirection::Descending,
653 )
654 .await;
655 }
656
657 #[tokio::test]
658 async fn test_advance_multi_vnode() {
659 test_watermark(vec![(1, 2), (8, 0)], WatermarkDirection::Ascending).await;
660 }
661
662 #[tokio::test]
663 async fn test_non_pk_prefix_watermark() {
664 fn gen_key_value(
665 vnode: usize,
666 col_0: i32,
667 col_1: i32,
668 col_2: i32,
669 col_3: i32,
670 pk_serde: &OrderedRowSerde,
671 pk_indices: &[usize],
672 ) -> (TableKey<Bytes>, SharedBufferValue<Bytes>) {
673 let r = OwnedRow::new(vec![
674 Some(ScalarImpl::Int32(col_0)),
675 Some(ScalarImpl::Int32(col_1)),
676 Some(ScalarImpl::Int32(col_2)), Some(ScalarImpl::Int32(col_3)),
678 ]);
679
680 let pk = r.project(pk_indices);
681
682 let k1 = serialize_pk_with_vnode(pk, pk_serde, VirtualNode::from_index(vnode));
683 let v1 = SharedBufferValue::Insert(Bytes::copy_from_slice(
684 format!("{}-value-{}", vnode, col_2).as_bytes(),
685 ));
686 (k1, v1)
687 }
688
689 let watermark_direction = WatermarkDirection::Ascending;
690
691 let watermark_col_serde =
692 OrderedRowSerde::new(vec![DataType::Int32], vec![OrderType::ascending()]);
693 let pk_serde = OrderedRowSerde::new(
694 vec![DataType::Int32, DataType::Int32, DataType::Int32],
695 vec![
696 OrderType::ascending(),
697 OrderType::ascending(),
698 OrderType::ascending(),
699 ],
700 );
701
702 let pk_indices = vec![0, 2, 3];
703 let watermark_col_idx_in_pk = 1;
704
705 {
706 let shared_buffer_batch = {
708 let mut kv_pairs = (0..10)
709 .map(|i| gen_key_value(0, i, 0, i, i, &pk_serde, &pk_indices))
710 .collect_vec();
711 kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
712 build_batch(kv_pairs.into_iter(), TABLE_ID)
713 }
714 .unwrap();
715
716 {
717 let read_watermark = ReadTableWatermark {
719 direction: watermark_direction,
720 vnode_watermarks: BTreeMap::default(),
721 };
722
723 let compaction_catalog_agent_ref = CompactionCatalogAgent::for_test(vec![TABLE_ID]);
724
725 let mut iter = NonPkPrefixSkipWatermarkIterator::new(
726 shared_buffer_batch.clone().into_forward_iter(),
727 NonPkPrefixSkipWatermarkState::new(
728 BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
729 compaction_catalog_agent_ref,
730 ),
731 );
732
733 iter.rewind().await.unwrap();
734 assert!(iter.is_valid());
735 for i in 0..10 {
736 let (k, _v) = gen_key_value(0, i, 0, i, i, &pk_serde, &pk_indices);
737 assert_eq!(iter.key().user_key.table_key.as_ref(), k.as_ref());
738 iter.next().await.unwrap();
739 }
740 assert!(!iter.is_valid());
741 }
742
743 {
744 let watermark = {
746 let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
747 serialize_pk(r1, &watermark_col_serde)
748 };
749
750 let read_watermark = ReadTableWatermark {
751 direction: watermark_direction,
752 vnode_watermarks: BTreeMap::from_iter(once((
753 VirtualNode::from_index(0),
754 watermark.clone(),
755 ))),
756 };
757
758 let full_key_filter_key_extractor =
759 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
760
761 let table_id_to_vnode =
762 HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
763
764 let table_id_to_watermark_serde = HashMap::from_iter(once((
765 TABLE_ID,
766 Some((
767 pk_serde.clone(),
768 watermark_col_serde.clone(),
769 watermark_col_idx_in_pk,
770 )),
771 )));
772
773 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
774 full_key_filter_key_extractor,
775 table_id_to_vnode,
776 table_id_to_watermark_serde,
777 ));
778
779 let mut iter = NonPkPrefixSkipWatermarkIterator::new(
780 shared_buffer_batch.clone().into_forward_iter(),
781 NonPkPrefixSkipWatermarkState::new(
782 BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
783 compaction_catalog_agent_ref,
784 ),
785 );
786
787 iter.rewind().await.unwrap();
788 assert!(iter.is_valid());
789 for i in 5..10 {
790 let (k, _v) = gen_key_value(0, i, 0, i, i, &pk_serde, &pk_indices);
791 assert_eq!(iter.key().user_key.table_key.as_ref(), k.as_ref());
792 iter.next().await.unwrap();
793 }
794 assert!(!iter.is_valid());
795 }
796 }
797
798 {
799 let shared_buffer_batch = {
801 let mut kv_pairs = (0..10_i32)
802 .map(|i| gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices))
803 .collect_vec();
804
805 kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
806 build_batch(kv_pairs.into_iter(), TABLE_ID)
807 };
808
809 {
810 let watermark = {
812 let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
813 serialize_pk(r1, &watermark_col_serde)
814 };
815
816 let read_watermark = ReadTableWatermark {
817 direction: watermark_direction,
818 vnode_watermarks: BTreeMap::from_iter(
819 (0..2).map(|i| (VirtualNode::from_index(i), watermark.clone())),
820 ),
821 };
822
823 let full_key_filter_key_extractor =
824 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
825
826 let table_id_to_vnode =
827 HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
828
829 let table_id_to_watermark_serde = HashMap::from_iter(once((
830 TABLE_ID,
831 Some((
832 pk_serde.clone(),
833 watermark_col_serde.clone(),
834 watermark_col_idx_in_pk,
835 )),
836 )));
837
838 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
839 full_key_filter_key_extractor,
840 table_id_to_vnode,
841 table_id_to_watermark_serde,
842 ));
843
844 let mut iter = NonPkPrefixSkipWatermarkIterator::new(
845 shared_buffer_batch.clone().unwrap().into_forward_iter(),
846 NonPkPrefixSkipWatermarkState::new(
847 BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
848 compaction_catalog_agent_ref,
849 ),
850 );
851
852 iter.rewind().await.unwrap();
853 assert!(iter.is_valid());
854 let mut kv_pairs = (5..10_i32)
855 .map(|i| {
856 let (k, v) =
857 gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices);
858 (k, v)
859 })
860 .collect_vec();
861 kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
862 let mut index = 0;
863 while iter.is_valid() {
864 assert!(kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
865 iter.next().await.unwrap();
866 index += 1;
867 }
868 }
869
870 {
871 let watermark = {
873 let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
874 serialize_pk(r1, &watermark_col_serde)
875 };
876
877 let read_watermark = ReadTableWatermark {
878 direction: watermark_direction,
879 vnode_watermarks: BTreeMap::from_iter(
880 (0..2).map(|i| (VirtualNode::from_index(i), watermark.clone())),
881 ),
882 };
883
884 let full_key_filter_key_extractor =
885 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
886
887 let table_id_to_vnode =
888 HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
889
890 let table_id_to_watermark_serde = HashMap::from_iter(once((
891 TABLE_ID,
892 Some((
893 pk_serde.clone(),
894 watermark_col_serde.clone(),
895 watermark_col_idx_in_pk,
896 )),
897 )));
898
899 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
900 full_key_filter_key_extractor,
901 table_id_to_vnode,
902 table_id_to_watermark_serde,
903 ));
904
905 let mut iter = NonPkPrefixSkipWatermarkIterator::new(
906 shared_buffer_batch.clone().unwrap().into_forward_iter(),
907 NonPkPrefixSkipWatermarkState::new(
908 BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
909 compaction_catalog_agent_ref,
910 ),
911 );
912
913 iter.rewind().await.unwrap();
914 assert!(iter.is_valid());
915 let mut kv_pairs = (5..10_i32)
916 .map(|i| {
917 let (k, v) =
918 gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices);
919 (k, v)
920 })
921 .collect_vec();
922 kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
923 let mut index = 0;
924 while iter.is_valid() {
925 assert!(kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
926 iter.next().await.unwrap();
927 index += 1;
928 }
929 }
930
931 {
932 let watermark = {
934 let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
935 serialize_pk(r1, &watermark_col_serde)
936 };
937
938 let read_watermark = ReadTableWatermark {
939 direction: WatermarkDirection::Descending,
940 vnode_watermarks: BTreeMap::from_iter(
941 (0..2).map(|i| (VirtualNode::from_index(i), watermark.clone())),
942 ),
943 };
944
945 let full_key_filter_key_extractor =
946 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
947
948 let table_id_to_vnode =
949 HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
950
951 let table_id_to_watermark_serde = HashMap::from_iter(once((
952 TABLE_ID,
953 Some((
954 pk_serde.clone(),
955 watermark_col_serde.clone(),
956 watermark_col_idx_in_pk,
957 )),
958 )));
959
960 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
961 full_key_filter_key_extractor,
962 table_id_to_vnode,
963 table_id_to_watermark_serde,
964 ));
965
966 let mut iter = NonPkPrefixSkipWatermarkIterator::new(
967 shared_buffer_batch.clone().unwrap().into_forward_iter(),
968 NonPkPrefixSkipWatermarkState::new(
969 BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
970 compaction_catalog_agent_ref,
971 ),
972 );
973
974 iter.rewind().await.unwrap();
975 assert!(iter.is_valid());
976 let mut kv_pairs = (0..=5_i32)
977 .map(|i| {
978 let (k, v) =
979 gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices);
980 (k, v)
981 })
982 .collect_vec();
983 kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
984 let mut index = 0;
985 while iter.is_valid() {
986 assert!(kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
987 iter.next().await.unwrap();
988 index += 1;
989 }
990 }
991 }
992 }
993
994 #[tokio::test]
995 async fn test_mix_watermark() {
996 fn gen_key_value(
997 vnode: usize,
998 col_0: i32,
999 col_1: i32,
1000 col_2: i32,
1001 col_3: i32,
1002 pk_serde: &OrderedRowSerde,
1003 pk_indices: &[usize],
1004 ) -> (TableKey<Bytes>, SharedBufferValue<Bytes>) {
1005 let r = OwnedRow::new(vec![
1006 Some(ScalarImpl::Int32(col_0)),
1007 Some(ScalarImpl::Int32(col_1)),
1008 Some(ScalarImpl::Int32(col_2)), Some(ScalarImpl::Int32(col_3)),
1010 ]);
1011
1012 let pk = r.project(pk_indices);
1013
1014 let k1 = serialize_pk_with_vnode(pk, pk_serde, VirtualNode::from_index(vnode));
1015 let v1 = SharedBufferValue::Insert(Bytes::copy_from_slice(
1016 format!("{}-value-{}-{}-{}-{}", vnode, col_0, col_1, col_2, col_3).as_bytes(),
1017 ));
1018
1019 (k1, v1)
1020 }
1021
1022 let watermark_col_serde =
1023 OrderedRowSerde::new(vec![DataType::Int32], vec![OrderType::ascending()]);
1024 let t1_pk_serde = OrderedRowSerde::new(
1025 vec![DataType::Int32, DataType::Int32, DataType::Int32],
1026 vec![
1027 OrderType::ascending(),
1028 OrderType::ascending(),
1029 OrderType::ascending(),
1030 ],
1031 );
1032
1033 let t1_pk_indices = vec![0, 2, 3];
1034 let t1_watermark_col_idx_in_pk = 1;
1035
1036 let t2_pk_indices = vec![0, 1, 2];
1037
1038 let t2_pk_serde = OrderedRowSerde::new(
1039 vec![DataType::Int32, DataType::Int32, DataType::Int32],
1040 vec![
1041 OrderType::ascending(),
1042 OrderType::ascending(),
1043 OrderType::ascending(),
1044 ],
1045 );
1046
1047 let t1_id = TABLE_ID;
1048 let t2_id = TableId::from(t1_id.as_raw_id() + 1);
1049
1050 let t1_shared_buffer_batch = {
1051 let mut kv_pairs = (0..10_i32)
1052 .map(|i| {
1053 gen_key_value(
1054 i as usize % 2,
1055 10 - i,
1056 0,
1057 i,
1058 i,
1059 &t1_pk_serde,
1060 &t1_pk_indices,
1061 )
1062 })
1063 .collect_vec();
1064
1065 kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1066 build_batch(kv_pairs.into_iter(), t1_id).unwrap()
1067 };
1068
1069 let t2_shared_buffer_batch = {
1070 let mut kv_pairs = (0..10_i32)
1071 .map(|i| {
1072 gen_key_value(
1073 i as usize % 2,
1074 10 - i,
1075 0,
1076 0,
1077 0,
1078 &t2_pk_serde,
1079 &t2_pk_indices,
1080 )
1081 })
1082 .collect_vec();
1083
1084 kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1085 build_batch(kv_pairs.into_iter(), t2_id).unwrap()
1086 };
1087
1088 let t1_watermark = {
1089 let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
1090 serialize_pk(r1, &watermark_col_serde)
1091 };
1092
1093 let t1_read_watermark = ReadTableWatermark {
1094 direction: WatermarkDirection::Ascending,
1095 vnode_watermarks: BTreeMap::from_iter(
1096 (0..2).map(|i| (VirtualNode::from_index(i), t1_watermark.clone())),
1097 ),
1098 };
1099
1100 let t2_watermark = {
1101 let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
1102 serialize_pk(r1, &watermark_col_serde)
1103 };
1104
1105 let t2_read_watermark = ReadTableWatermark {
1106 direction: WatermarkDirection::Ascending,
1107 vnode_watermarks: BTreeMap::from_iter(
1108 (0..2).map(|i| (VirtualNode::from_index(i), t2_watermark.clone())),
1109 ),
1110 };
1111
1112 {
1113 let t1_iter = t1_shared_buffer_batch.clone().into_forward_iter();
1115 let t2_iter = t2_shared_buffer_batch.clone().into_forward_iter();
1116 let iter_vec = vec![t1_iter, t2_iter];
1117 let merge_iter = MergeIterator::new(iter_vec);
1118
1119 let full_key_filter_key_extractor =
1120 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
1121
1122 let table_id_to_vnode =
1123 HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
1124
1125 let table_id_to_watermark_serde = HashMap::from_iter(once((
1126 t1_id,
1127 Some((
1128 t1_pk_serde.clone(),
1129 watermark_col_serde.clone(),
1130 t1_watermark_col_idx_in_pk,
1131 )),
1132 )));
1133
1134 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1135 full_key_filter_key_extractor,
1136 table_id_to_vnode,
1137 table_id_to_watermark_serde,
1138 ));
1139
1140 let mut iter = NonPkPrefixSkipWatermarkIterator::new(
1141 merge_iter,
1142 NonPkPrefixSkipWatermarkState::new(
1143 BTreeMap::from_iter(once((TABLE_ID, t1_read_watermark.clone()))),
1144 compaction_catalog_agent_ref,
1145 ),
1146 );
1147
1148 iter.rewind().await.unwrap();
1149 assert!(iter.is_valid());
1150 let mut t1_kv_pairs = (5..10_i32)
1151 .map(|i| {
1152 let (k, v) = gen_key_value(
1153 i as usize % 2,
1154 10 - i,
1155 0,
1156 i,
1157 i,
1158 &t1_pk_serde,
1159 &t1_pk_indices,
1160 );
1161 (k, v)
1162 })
1163 .collect_vec();
1164
1165 t1_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1166
1167 let mut t2_kv_pairs = (0..10_i32)
1168 .map(|i| {
1169 gen_key_value(
1170 i as usize % 2,
1171 10 - i,
1172 0,
1173 0,
1174 0,
1175 &t2_pk_serde,
1176 &t2_pk_indices,
1177 )
1178 })
1179 .collect_vec();
1180
1181 t2_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1182 let mut index = 0;
1183 for _ in 0..t1_kv_pairs.len() {
1184 assert!(t1_kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
1185 iter.next().await.unwrap();
1186 index += 1;
1187 }
1188
1189 assert!(iter.is_valid());
1190 assert_eq!(t1_kv_pairs.len(), index);
1191
1192 index = 0;
1193 for _ in 0..t2_kv_pairs.len() {
1194 assert!(t2_kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
1195 iter.next().await.unwrap();
1196 index += 1;
1197 }
1198
1199 assert!(!iter.is_valid());
1200 assert_eq!(t2_kv_pairs.len(), index);
1201 }
1202
1203 {
1204 let t1_iter = t1_shared_buffer_batch.clone().into_forward_iter();
1205 let t2_iter = t2_shared_buffer_batch.clone().into_forward_iter();
1206 let iter_vec = vec![t1_iter, t2_iter];
1207 let merge_iter = MergeIterator::new(iter_vec);
1208
1209 let full_key_filter_key_extractor =
1210 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
1211
1212 let table_id_to_vnode = HashMap::from_iter(
1213 vec![
1214 (t1_id, VirtualNode::COUNT_FOR_TEST),
1215 (t2_id, VirtualNode::COUNT_FOR_TEST),
1216 ]
1217 .into_iter(),
1218 );
1219
1220 let table_id_to_watermark_serde = HashMap::from_iter(
1221 vec![
1222 (
1223 t1_id,
1224 Some((
1225 t1_pk_serde.clone(),
1226 watermark_col_serde.clone(),
1227 t1_watermark_col_idx_in_pk,
1228 )),
1229 ),
1230 (t2_id, None),
1231 ]
1232 .into_iter(),
1233 );
1234
1235 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1236 full_key_filter_key_extractor,
1237 table_id_to_vnode,
1238 table_id_to_watermark_serde,
1239 ));
1240
1241 let non_pk_prefix_iter = NonPkPrefixSkipWatermarkIterator::new(
1242 merge_iter,
1243 NonPkPrefixSkipWatermarkState::new(
1244 BTreeMap::from_iter(once((t1_id, t1_read_watermark.clone()))),
1245 compaction_catalog_agent_ref.clone(),
1246 ),
1247 );
1248
1249 let mut mix_iter = PkPrefixSkipWatermarkIterator::new(
1250 non_pk_prefix_iter,
1251 PkPrefixSkipWatermarkState::new(BTreeMap::from_iter(once((
1252 t2_id,
1253 t2_read_watermark.clone(),
1254 )))),
1255 );
1256
1257 mix_iter.rewind().await.unwrap();
1258 assert!(mix_iter.is_valid());
1259
1260 let mut t1_kv_pairs = (5..10_i32)
1261 .map(|i| {
1262 let (k, v) = gen_key_value(
1263 i as usize % 2,
1264 10 - i,
1265 0,
1266 i,
1267 i,
1268 &t1_pk_serde,
1269 &t1_pk_indices,
1270 );
1271 (k, v)
1272 })
1273 .collect_vec();
1274
1275 t1_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1276
1277 let mut t2_kv_pairs = (0..=5_i32)
1278 .map(|i| {
1279 gen_key_value(
1280 i as usize % 2,
1281 10 - i,
1282 0,
1283 0,
1284 0,
1285 &t2_pk_serde,
1286 &t2_pk_indices,
1287 )
1288 })
1289 .collect_vec();
1290
1291 t2_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1292
1293 let mut index = 0;
1294 for _ in 0..t1_kv_pairs.len() {
1295 assert!(
1296 t1_kv_pairs[index].0.as_ref() == mix_iter.key().user_key.table_key.as_ref()
1297 );
1298 mix_iter.next().await.unwrap();
1299 index += 1;
1300 }
1301
1302 assert!(mix_iter.is_valid());
1303 assert_eq!(t1_kv_pairs.len(), index);
1304
1305 index = 0;
1306
1307 for _ in 0..t2_kv_pairs.len() {
1308 assert!(
1309 t2_kv_pairs[index].0.as_ref() == mix_iter.key().user_key.table_key.as_ref()
1310 );
1311 mix_iter.next().await.unwrap();
1312 index += 1;
1313 }
1314
1315 assert!(!mix_iter.is_valid());
1316 assert_eq!(t2_kv_pairs.len(), index);
1317 }
1318 }
1319}