1use std::cmp::Ordering;
16use std::collections::{BTreeMap, VecDeque};
17
18use bytes::Bytes;
19use risingwave_common::catalog::TableId;
20use risingwave_common::hash::VirtualNode;
21use risingwave_common::row::Row;
22use risingwave_common::types::Datum;
23use risingwave_common::util::row_serde::OrderedRowSerde;
24use risingwave_hummock_sdk::compaction_group::hummock_version_ext::safe_epoch_read_table_watermarks_impl;
25use risingwave_hummock_sdk::key::FullKey;
26use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap, add_table_stats_map};
27use risingwave_hummock_sdk::table_watermark::{
28 ReadTableWatermark, TableWatermarks, WatermarkDirection,
29};
30
31use super::SkipWatermarkState;
32use crate::compaction_catalog_manager::{CompactionCatalogAgentRef, ValueWatermarkColumnSerdeRef};
33use crate::hummock::HummockResult;
34use crate::hummock::iterator::{Forward, HummockIterator, ValueMeta};
35use crate::hummock::value::HummockValue;
36use crate::monitor::StoreLocalStatistic;
37
38pub struct SkipWatermarkIterator<I, S> {
39 inner: I,
40 state: S,
41 skipped_entry_table_stats: TableStatsMap,
43 last_table_id: Option<TableId>,
45 last_table_stats: TableStats,
47}
48
49impl<I: HummockIterator<Direction = Forward>, S: SkipWatermarkState> SkipWatermarkIterator<I, S> {
50 pub fn new(inner: I, state: S) -> Self {
51 Self {
52 inner,
53 state,
54 skipped_entry_table_stats: TableStatsMap::default(),
55 last_table_id: None,
56 last_table_stats: TableStats::default(),
57 }
58 }
59
60 fn reset_watermark(&mut self) {
61 self.state.reset_watermark();
62 }
63
64 fn reset_skipped_entry_table_stats(&mut self) {
65 self.skipped_entry_table_stats = TableStatsMap::default();
66 self.last_table_id = None;
67 self.last_table_stats = TableStats::default();
68 }
69
70 async fn advance_key_and_watermark(&mut self) -> HummockResult<()> {
75 while self.inner.is_valid() {
78 if !self
79 .state
80 .should_delete(&self.inner.key(), self.inner.value())
81 {
82 break;
83 }
84
85 if self
86 .last_table_id
87 .is_none_or(|last_table_id| last_table_id != self.inner.key().user_key.table_id)
88 {
89 self.add_last_table_stats();
90 self.last_table_id = Some(self.inner.key().user_key.table_id);
91 }
92 self.last_table_stats.total_key_count -= 1;
93 self.last_table_stats.total_key_size -= self.inner.key().encoded_len() as i64;
94 self.last_table_stats.total_value_size -= self.inner.value().encoded_len() as i64;
95
96 self.inner.next().await?;
97 }
98 self.add_last_table_stats();
99 Ok(())
100 }
101
102 fn add_last_table_stats(&mut self) {
103 let Some(last_table_id) = self.last_table_id.take() else {
104 return;
105 };
106 let delta = std::mem::take(&mut self.last_table_stats);
107 let e = self
108 .skipped_entry_table_stats
109 .entry(last_table_id)
110 .or_default();
111 e.total_key_count += delta.total_key_count;
112 e.total_key_size += delta.total_key_size;
113 e.total_value_size += delta.total_value_size;
114 }
115}
116
117impl<I: HummockIterator<Direction = Forward>, S: SkipWatermarkState> HummockIterator
118 for SkipWatermarkIterator<I, S>
119{
120 type Direction = Forward;
121
122 async fn next(&mut self) -> HummockResult<()> {
123 self.inner.next().await?;
124 if self.state.has_watermark() {
128 self.advance_key_and_watermark().await?;
129 }
130 Ok(())
131 }
132
133 fn key(&self) -> FullKey<&[u8]> {
134 self.inner.key()
135 }
136
137 fn value(&self) -> HummockValue<&[u8]> {
138 self.inner.value()
139 }
140
141 fn is_valid(&self) -> bool {
142 self.inner.is_valid()
143 }
144
145 async fn rewind(&mut self) -> HummockResult<()> {
146 self.reset_watermark();
147 self.reset_skipped_entry_table_stats();
148 self.inner.rewind().await?;
149 self.advance_key_and_watermark().await?;
150 Ok(())
151 }
152
153 async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
154 self.reset_watermark();
155 self.reset_skipped_entry_table_stats();
156 self.inner.seek(key).await?;
157 self.advance_key_and_watermark().await?;
158 Ok(())
159 }
160
161 fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
162 add_table_stats_map(
163 &mut stats.skipped_by_watermark_table_stats,
164 &self.skipped_entry_table_stats,
165 );
166 self.inner.collect_local_statistic(stats)
167 }
168
169 fn value_meta(&self) -> ValueMeta {
170 self.inner.value_meta()
171 }
172}
173pub struct PkPrefixSkipWatermarkState {
174 watermarks: BTreeMap<TableId, ReadTableWatermark>,
175 remain_watermarks: VecDeque<(TableId, VirtualNode, WatermarkDirection, Bytes)>,
176}
177
178impl SkipWatermarkState for PkPrefixSkipWatermarkState {
179 #[inline(always)]
180 fn has_watermark(&self) -> bool {
181 !self.remain_watermarks.is_empty()
182 }
183
184 fn should_delete(&mut self, key: &FullKey<&[u8]>, value: HummockValue<&[u8]>) -> bool {
185 if let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
186 let key_table_id = key.user_key.table_id;
187 let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
188 match (&key_table_id, &key_vnode).cmp(&(table_id, vnode)) {
189 Ordering::Less => {
190 return false;
191 }
192 Ordering::Equal => {
193 return direction.key_filter_by_watermark(inner_key, watermark);
194 }
195 Ordering::Greater => {
196 return self.advance_watermark(key, value);
199 }
200 }
201 }
202 false
203 }
204
205 fn reset_watermark(&mut self) {
206 self.remain_watermarks = self
207 .watermarks
208 .iter()
209 .flat_map(|(table_id, read_watermarks)| {
210 read_watermarks
211 .vnode_watermarks
212 .iter()
213 .map(|(vnode, watermarks)| {
214 (
215 *table_id,
216 *vnode,
217 read_watermarks.direction,
218 watermarks.clone(),
219 )
220 })
221 })
222 .collect();
223 }
224
225 fn advance_watermark(&mut self, key: &FullKey<&[u8]>, _value: HummockValue<&[u8]>) -> bool {
230 let key_table_id = key.user_key.table_id;
231 let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
232 while let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
233 match (table_id, vnode).cmp(&(&key_table_id, &key_vnode)) {
234 Ordering::Less => {
235 self.remain_watermarks.pop_front();
236 continue;
237 }
238 Ordering::Equal => {
239 match direction {
240 WatermarkDirection::Ascending => {
241 match inner_key.cmp(watermark.as_ref()) {
242 Ordering::Less => {
243 return true;
246 }
247 Ordering::Equal | Ordering::Greater => {
248 self.remain_watermarks.pop_front();
251 #[cfg(debug_assertions)]
256 {
257 if let Some((next_table_id, next_vnode, _, _)) =
258 self.remain_watermarks.front()
259 {
260 assert!(
261 (next_table_id, next_vnode)
262 > (&key_table_id, &key_vnode)
263 );
264 }
265 }
266 return false;
267 }
268 }
269 }
270 WatermarkDirection::Descending => {
271 return match inner_key.cmp(watermark.as_ref()) {
272 Ordering::Less | Ordering::Equal => false,
274 Ordering::Greater => true,
277 };
278 }
279 }
280 }
281 Ordering::Greater => {
282 return false;
283 }
284 }
285 }
286 false
287 }
288}
289
290impl PkPrefixSkipWatermarkState {
291 pub fn new(watermarks: BTreeMap<TableId, ReadTableWatermark>) -> Self {
292 Self {
293 remain_watermarks: VecDeque::new(),
294 watermarks,
295 }
296 }
297
298 pub fn from_safe_epoch_watermarks(
299 safe_epoch_watermarks: BTreeMap<TableId, TableWatermarks>,
300 ) -> Self {
301 let watermarks = safe_epoch_read_table_watermarks_impl(safe_epoch_watermarks);
302 Self::new(watermarks)
303 }
304}
305
306pub struct NonPkPrefixSkipWatermarkState {
307 watermarks: BTreeMap<TableId, ReadTableWatermark>,
308 remain_watermarks: VecDeque<(TableId, VirtualNode, WatermarkDirection, Datum)>,
309 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
310
311 last_serde: Option<(OrderedRowSerde, OrderedRowSerde, usize)>,
312 last_table_id: Option<TableId>,
313}
314
315impl NonPkPrefixSkipWatermarkState {
316 pub fn new(
317 watermarks: BTreeMap<TableId, ReadTableWatermark>,
318 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
319 ) -> Self {
320 Self {
321 remain_watermarks: VecDeque::new(),
322 watermarks,
323 compaction_catalog_agent_ref,
324 last_serde: None,
325 last_table_id: None,
326 }
327 }
328
329 pub fn from_safe_epoch_watermarks(
330 safe_epoch_watermarks: BTreeMap<TableId, TableWatermarks>,
331 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
332 ) -> Self {
333 let watermarks = safe_epoch_read_table_watermarks_impl(safe_epoch_watermarks);
334 Self::new(watermarks, compaction_catalog_agent_ref)
335 }
336}
337
338impl SkipWatermarkState for NonPkPrefixSkipWatermarkState {
339 #[inline(always)]
340 fn has_watermark(&self) -> bool {
341 !self.remain_watermarks.is_empty()
342 }
343
344 fn should_delete(&mut self, key: &FullKey<&[u8]>, value: HummockValue<&[u8]>) -> bool {
345 if let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
346 let key_table_id = key.user_key.table_id;
347 {
348 if self
349 .last_table_id
350 .is_none_or(|last_table_id| last_table_id != key_table_id)
351 {
352 self.last_table_id = Some(key_table_id);
353 }
354 }
355
356 let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
357 match (&key_table_id, &key_vnode).cmp(&(table_id, vnode)) {
358 Ordering::Less => {
359 return false;
360 }
361 Ordering::Equal => {
362 if self.last_serde.is_none() {
363 self.last_serde =
364 self.compaction_catalog_agent_ref.watermark_serde(*table_id);
365 }
366 let (pk_prefix_serde, watermark_col_serde, watermark_col_idx_in_pk) =
367 self.last_serde.as_ref().unwrap();
368 let row = pk_prefix_serde
369 .deserialize(inner_key)
370 .unwrap_or_else(|_| {
371 panic!("Failed to deserialize pk_prefix inner_key {:?} serde data_types {:?} order_types {:?}", inner_key, pk_prefix_serde.get_data_types(), pk_prefix_serde.get_order_types());
372 });
373 let watermark_col_in_pk = row.datum_at(*watermark_col_idx_in_pk);
374 return direction.datum_filter_by_watermark(
375 watermark_col_in_pk,
376 watermark,
377 watermark_col_serde.get_order_types()[0],
378 );
379 }
380 Ordering::Greater => {
381 return self.advance_watermark(key, value);
384 }
385 }
386 }
387 false
388 }
389
390 fn reset_watermark(&mut self) {
391 self.last_serde = None;
392 self.remain_watermarks = self
393 .watermarks
394 .iter()
395 .flat_map(|(table_id, read_watermarks)| {
396 let watermark_serde = self.compaction_catalog_agent_ref.watermark_serde(*table_id).map(|(_pk_serde, watermark_serde, _watermark_col_idx_in_pk)| watermark_serde);
397
398 read_watermarks
399 .vnode_watermarks
400 .iter()
401 .flat_map(move |(vnode, watermarks)| {
402 let watermark_serde = watermark_serde.as_ref()?;
404 Some((
405 *table_id,
406 *vnode,
407 read_watermarks.direction,
408 {
409 let row = watermark_serde
410 .deserialize(watermarks).unwrap_or_else(|_| {
411 panic!("Failed to deserialize watermark {:?} serde data_types {:?} order_types {:?}", watermarks, watermark_serde.get_data_types(), watermark_serde.get_order_types());
412 });
413 row[0].clone()
414 },
415 ))
416 })
417 })
418 .collect();
419 }
420
421 fn advance_watermark(&mut self, key: &FullKey<&[u8]>, _value: HummockValue<&[u8]>) -> bool {
422 let key_table_id = key.user_key.table_id;
423 let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
424 while let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
425 match (table_id, vnode).cmp(&(&key_table_id, &key_vnode)) {
426 Ordering::Less => {
427 self.remain_watermarks.pop_front();
428 self.last_serde = None;
429 continue;
430 }
431 Ordering::Equal => {
432 if self.last_serde.is_none() {
433 self.last_serde =
434 self.compaction_catalog_agent_ref.watermark_serde(*table_id);
435 }
436 let (pk_prefix_serde, watermark_col_serde, watermark_col_idx_in_pk) =
437 self.last_serde.as_ref().unwrap();
438
439 let row = pk_prefix_serde
440 .deserialize(inner_key)
441 .unwrap_or_else(|_| {
442 panic!("Failed to deserialize pk_prefix inner_key {:?} serde data_types {:?} order_types {:?}", inner_key, pk_prefix_serde.get_data_types(), pk_prefix_serde.get_order_types());
443 });
444 let watermark_col_in_pk = row.datum_at(*watermark_col_idx_in_pk);
445
446 return direction.datum_filter_by_watermark(
447 watermark_col_in_pk,
448 watermark,
449 watermark_col_serde.get_order_types()[0],
450 );
451 }
452 Ordering::Greater => {
453 return false;
454 }
455 }
456 }
457 false
458 }
459}
460
461pub type PkPrefixSkipWatermarkIterator<I> = SkipWatermarkIterator<I, PkPrefixSkipWatermarkState>;
462
463pub type NonPkPrefixSkipWatermarkIterator<I> =
464 SkipWatermarkIterator<I, NonPkPrefixSkipWatermarkState>;
465
466pub type ValueSkipWatermarkIterator<I> = SkipWatermarkIterator<I, ValueSkipWatermarkState>;
467
468pub struct ValueSkipWatermarkState {
469 pub watermarks: BTreeMap<TableId, ReadTableWatermark>,
470 remain_watermarks: VecDeque<(TableId, VirtualNode, WatermarkDirection, Bytes)>,
471 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
472 last_serde: Option<ValueWatermarkColumnSerdeRef>,
473 last_table_id: Option<TableId>,
474}
475
476impl ValueSkipWatermarkState {
477 pub fn new(
478 watermarks: BTreeMap<TableId, ReadTableWatermark>,
479 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
480 ) -> Self {
481 Self {
482 remain_watermarks: VecDeque::new(),
483 watermarks,
484 compaction_catalog_agent_ref,
485 last_serde: None,
486 last_table_id: None,
487 }
488 }
489
490 pub fn from_safe_epoch_watermarks(
491 safe_epoch_watermarks: BTreeMap<TableId, TableWatermarks>,
492 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
493 ) -> Self {
494 let watermarks = safe_epoch_read_table_watermarks_impl(safe_epoch_watermarks);
495 Self::new(watermarks, compaction_catalog_agent_ref)
496 }
497
498 pub fn may_delete(&self, key: &FullKey<&[u8]>) -> bool {
499 let table_id = key.user_key.table_id;
500 self.watermarks.contains_key(&table_id)
501 }
502}
503
504impl SkipWatermarkState for ValueSkipWatermarkState {
505 #[inline(always)]
506 fn has_watermark(&self) -> bool {
507 !self.remain_watermarks.is_empty()
508 }
509
510 fn should_delete(&mut self, key: &FullKey<&[u8]>, value: HummockValue<&[u8]>) -> bool {
511 if let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
512 let key_table_id = key.user_key.table_id;
513 let key_vnode = key.user_key.table_key.vnode_part();
514 if self
515 .last_table_id
516 .is_none_or(|last_table_id| last_table_id != key_table_id)
517 {
518 self.last_table_id = Some(key_table_id);
519 }
520 match (&key_table_id, &key_vnode).cmp(&(table_id, vnode)) {
521 Ordering::Less => {
522 return false;
523 }
524 Ordering::Equal => {
525 let HummockValue::Put(value) = value else {
526 return false;
527 };
528 if self.last_serde.is_none() {
529 self.last_serde = self
530 .compaction_catalog_agent_ref
531 .value_watermark_serde(*table_id);
532 }
533 let last_serde = self.last_serde.as_ref().unwrap();
534 let Ok(watermark_column_value) = last_serde.deserialize(value) else {
535 tracing::error!(
536 ?table_id,
537 ?vnode,
538 ?value,
539 "Failed to deserialize watermark column"
540 );
541 return false;
542 };
543 let Some(watermark_column_value) = watermark_column_value else {
544 tracing::debug!(
545 ?table_id,
546 ?vnode,
547 ?value,
548 "The specified watermark column was not found in the value, likely due to the use of a non-current catalog. Restarting the compactor should resolve this issue."
549 );
550 return false;
551 };
552 return direction.key_filter_by_watermark(&watermark_column_value, watermark);
553 }
554 Ordering::Greater => {
555 return self.advance_watermark(key, value);
558 }
559 }
560 }
561 false
562 }
563
564 fn reset_watermark(&mut self) {
565 self.last_serde = None;
566 self.remain_watermarks = self
567 .watermarks
568 .iter()
569 .flat_map(|(table_id, read_watermarks)| {
570 read_watermarks
571 .vnode_watermarks
572 .iter()
573 .map(|(vnode, watermarks)| {
574 (
575 *table_id,
576 *vnode,
577 read_watermarks.direction,
578 watermarks.clone(),
579 )
580 })
581 })
582 .collect();
583 }
584
585 fn advance_watermark(&mut self, key: &FullKey<&[u8]>, value: HummockValue<&[u8]>) -> bool {
586 let key_table_id = key.user_key.table_id;
587 let key_vnode = key.user_key.table_key.vnode_part();
588 while let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
589 match (table_id, vnode).cmp(&(&key_table_id, &key_vnode)) {
590 Ordering::Less => {
591 self.remain_watermarks.pop_front();
592 self.last_serde = None;
593 continue;
594 }
595 Ordering::Equal => {
596 let HummockValue::Put(value) = value else {
597 return false;
598 };
599 if self.last_serde.is_none() {
600 self.last_serde = self
601 .compaction_catalog_agent_ref
602 .value_watermark_serde(*table_id);
603 }
604 let last_serde = self.last_serde.as_ref().unwrap();
605 let Ok(watermark_column_value) = last_serde.deserialize(value) else {
606 tracing::error!(
607 ?table_id,
608 ?vnode,
609 ?value,
610 "Failed to deserialize watermark column."
611 );
612 return false;
613 };
614 let Some(watermark_column_value) = watermark_column_value else {
615 tracing::warn!(
616 ?table_id,
617 ?vnode,
618 ?value,
619 "The specified watermark column was not found in the value, likely due to the use of a non-current catalog. Restarting the compactor should resolve this issue."
620 );
621 return false;
622 };
623 return direction.key_filter_by_watermark(&watermark_column_value, watermark);
624 }
625 Ordering::Greater => {
626 return false;
627 }
628 }
629 }
630 false
631 }
632}
633
634#[cfg(test)]
635mod tests {
636 use std::collections::{BTreeMap, HashMap};
637 use std::iter::{empty, once};
638 use std::sync::Arc;
639
640 use bytes::Bytes;
641 use itertools::Itertools;
642 use risingwave_common::catalog::TableId;
643 use risingwave_common::hash::VirtualNode;
644 use risingwave_common::row::{OwnedRow, RowExt};
645 use risingwave_common::types::{DataType, ScalarImpl};
646 use risingwave_common::util::epoch::test_epoch;
647 use risingwave_common::util::row_serde::OrderedRowSerde;
648 use risingwave_common::util::sort_util::OrderType;
649 use risingwave_hummock_sdk::EpochWithGap;
650 use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey, gen_key_from_str};
651 use risingwave_hummock_sdk::table_watermark::{ReadTableWatermark, WatermarkDirection};
652
653 use super::PkPrefixSkipWatermarkState;
654 use crate::compaction_catalog_manager::{
655 CompactionCatalogAgent, FilterKeyExtractorImpl, FullKeyFilterKeyExtractor,
656 };
657 use crate::hummock::HummockValue;
658 use crate::hummock::iterator::{
659 HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
660 NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator, SkipWatermarkState,
661 };
662 use crate::hummock::shared_buffer::shared_buffer_batch::{
663 SharedBufferBatch, SharedBufferValue,
664 };
665 use crate::row_serde::row_serde_util::{serialize_pk, serialize_pk_with_vnode};
666
667 const EPOCH: u64 = test_epoch(1);
668 const TABLE_ID: TableId = TableId::new(233);
669
670 async fn assert_iter_eq(
671 mut first: Option<impl HummockIterator>,
672 mut second: impl HummockIterator,
673 seek_key: Option<(usize, usize)>,
674 ) {
675 if let Some((vnode, key_index)) = seek_key {
676 let (seek_key, _) = gen_key_value(vnode, key_index);
677 let full_key = FullKey {
678 user_key: UserKey {
679 table_id: TABLE_ID,
680 table_key: seek_key,
681 },
682 epoch_with_gap: EpochWithGap::new_from_epoch(EPOCH),
683 };
684 if let Some(first) = &mut first {
685 first.seek(full_key.to_ref()).await.unwrap();
686 }
687 second.seek(full_key.to_ref()).await.unwrap()
688 } else {
689 if let Some(first) = &mut first {
690 first.rewind().await.unwrap();
691 }
692 second.rewind().await.unwrap();
693 }
694
695 if let Some(first) = &mut first {
696 while first.is_valid() {
697 assert!(second.is_valid());
698 let first_key = first.key();
699 let second_key = second.key();
700 assert_eq!(first_key, second_key);
701 assert_eq!(first.value(), second.value());
702 first.next().await.unwrap();
703 second.next().await.unwrap();
704 }
705 }
706 assert!(!second.is_valid());
707 }
708
709 fn build_batch(
710 pairs: impl Iterator<Item = (TableKey<Bytes>, SharedBufferValue<Bytes>)>,
711 table_id: TableId,
712 ) -> Option<SharedBufferBatch> {
713 let pairs: Vec<_> = pairs.collect();
714 if pairs.is_empty() {
715 None
716 } else {
717 Some(SharedBufferBatch::for_test(pairs, EPOCH, table_id))
718 }
719 }
720
721 fn filter_with_watermarks(
722 iter: impl Iterator<Item = (TableKey<Bytes>, SharedBufferValue<Bytes>)>,
723 table_watermarks: ReadTableWatermark,
724 ) -> impl Iterator<Item = (TableKey<Bytes>, SharedBufferValue<Bytes>)> {
725 iter.filter(move |(key, _)| {
726 if let Some(watermark) = table_watermarks.vnode_watermarks.get(&key.vnode_part()) {
727 !table_watermarks
728 .direction
729 .key_filter_by_watermark(key.key_part(), watermark)
730 } else {
731 true
732 }
733 })
734 }
735
736 fn gen_inner_key(index: usize) -> String {
737 format!("key{:5}", index)
738 }
739
740 async fn test_watermark(
741 watermarks: impl IntoIterator<Item = (usize, usize)>,
742 direction: WatermarkDirection,
743 ) {
744 let test_index: [(usize, usize); 7] =
745 [(0, 2), (0, 3), (0, 4), (1, 1), (1, 3), (4, 2), (8, 1)];
746 let items = test_index
747 .iter()
748 .map(|(vnode, key_index)| gen_key_value(*vnode, *key_index))
749 .collect_vec();
750
751 let read_watermark = ReadTableWatermark {
752 direction,
753 vnode_watermarks: BTreeMap::from_iter(watermarks.into_iter().map(
754 |(vnode, key_index)| {
755 (
756 VirtualNode::from_index(vnode),
757 Bytes::from(gen_inner_key(key_index)),
758 )
759 },
760 )),
761 };
762
763 let gen_iters = || {
764 let batch = build_batch(
765 filter_with_watermarks(items.clone().into_iter(), read_watermark.clone()),
766 TABLE_ID,
767 );
768
769 let iter = PkPrefixSkipWatermarkIterator::new(
770 build_batch(items.clone().into_iter(), TABLE_ID)
771 .unwrap()
772 .into_forward_iter(),
773 PkPrefixSkipWatermarkState::new(BTreeMap::from_iter(once((
774 TABLE_ID,
775 read_watermark.clone(),
776 )))),
777 );
778 (batch.map(|batch| batch.into_forward_iter()), iter)
779 };
780 let (first, second) = gen_iters();
781 assert_iter_eq(first, second, None).await;
782 for (vnode, key_index) in &test_index {
783 let (first, second) = gen_iters();
784 assert_iter_eq(first, second, Some((*vnode, *key_index))).await;
785 }
786 let (last_vnode, last_key_index) = test_index.last().unwrap();
787 let (first, second) = gen_iters();
788 assert_iter_eq(first, second, Some((*last_vnode, last_key_index + 1))).await;
789 }
790
791 fn gen_key_value(vnode: usize, index: usize) -> (TableKey<Bytes>, SharedBufferValue<Bytes>) {
792 (
793 gen_key_from_str(VirtualNode::from_index(vnode), &gen_inner_key(index)),
794 SharedBufferValue::Insert(Bytes::copy_from_slice(
795 format!("{}-value-{}", vnode, index).as_bytes(),
796 )),
797 )
798 }
799
800 fn gen_full_key(table_id: TableId, vnode: usize, inner_key: &str) -> FullKey<Bytes> {
801 FullKey {
802 user_key: UserKey {
803 table_id,
804 table_key: gen_key_from_str(VirtualNode::from_index(vnode), inner_key),
805 },
806 epoch_with_gap: EpochWithGap::new_from_epoch(EPOCH),
807 }
808 }
809
810 fn assert_pk_prefix_decision_stable_after_progress(
811 watermarks: BTreeMap<TableId, ReadTableWatermark>,
812 progressed_key: FullKey<Bytes>,
813 later_key: FullKey<Bytes>,
814 ) {
815 let mut fresh_state = PkPrefixSkipWatermarkState::new(watermarks.clone());
816 fresh_state.reset_watermark();
817
818 let mut advanced_state = PkPrefixSkipWatermarkState::new(watermarks);
819 advanced_state.reset_watermark();
820
821 advanced_state.should_delete(&progressed_key.to_ref(), HummockValue::Put(&[]));
822
823 assert_eq!(
824 fresh_state.should_delete(&later_key.to_ref(), HummockValue::Put(&[])),
825 advanced_state.should_delete(&later_key.to_ref(), HummockValue::Put(&[])),
826 "pk-prefix watermark decision diverged after prior progress",
827 );
828 }
829
830 fn gen_non_pk_full_key(
831 table_id: TableId,
832 vnode: usize,
833 col_0: i32,
834 watermark_col: i32,
835 col_2: i32,
836 pk_serde: &OrderedRowSerde,
837 ) -> FullKey<Bytes> {
838 let pk = OwnedRow::new(vec![
839 Some(ScalarImpl::Int32(col_0)),
840 Some(ScalarImpl::Int32(watermark_col)),
841 Some(ScalarImpl::Int32(col_2)),
842 ]);
843 FullKey {
844 user_key: UserKey {
845 table_id,
846 table_key: serialize_pk_with_vnode(pk, pk_serde, VirtualNode::from_index(vnode)),
847 },
848 epoch_with_gap: EpochWithGap::new_from_epoch(EPOCH),
849 }
850 }
851
852 fn assert_non_pk_prefix_decision_stable_after_progress(
853 watermarks: BTreeMap<TableId, ReadTableWatermark>,
854 compaction_catalog_agent_ref: Arc<CompactionCatalogAgent>,
855 progressed_key: FullKey<Bytes>,
856 later_key: FullKey<Bytes>,
857 ) {
858 let mut fresh_state = NonPkPrefixSkipWatermarkState::new(
859 watermarks.clone(),
860 compaction_catalog_agent_ref.clone(),
861 );
862 fresh_state.reset_watermark();
863
864 let mut advanced_state =
865 NonPkPrefixSkipWatermarkState::new(watermarks, compaction_catalog_agent_ref);
866 advanced_state.reset_watermark();
867
868 advanced_state.should_delete(&progressed_key.to_ref(), HummockValue::Put(&[]));
869
870 assert_eq!(
871 fresh_state.should_delete(&later_key.to_ref(), HummockValue::Put(&[])),
872 advanced_state.should_delete(&later_key.to_ref(), HummockValue::Put(&[])),
873 "non-pk watermark decision diverged after prior progress",
874 );
875 }
876
877 #[tokio::test]
878 async fn test_no_watermark() {
879 test_watermark(empty(), WatermarkDirection::Ascending).await;
880 test_watermark(empty(), WatermarkDirection::Descending).await;
881 }
882
883 #[tokio::test]
884 async fn test_too_low_watermark() {
885 test_watermark(vec![(0, 0)], WatermarkDirection::Ascending).await;
886 test_watermark(vec![(0, 10)], WatermarkDirection::Descending).await;
887 }
888
889 #[tokio::test]
890 async fn test_single_watermark() {
891 test_watermark(vec![(0, 3)], WatermarkDirection::Ascending).await;
892 test_watermark(vec![(0, 3)], WatermarkDirection::Descending).await;
893 }
894
895 #[tokio::test]
896 async fn test_watermark_vnode_no_data() {
897 test_watermark(vec![(3, 3)], WatermarkDirection::Ascending).await;
898 test_watermark(vec![(3, 3)], WatermarkDirection::Descending).await;
899 }
900
901 #[tokio::test]
902 async fn test_filter_all() {
903 test_watermark(
904 vec![(0, 5), (1, 4), (2, 0), (4, 3), (8, 2)],
905 WatermarkDirection::Ascending,
906 )
907 .await;
908 test_watermark(
909 vec![(0, 0), (1, 0), (2, 0), (4, 0), (8, 0)],
910 WatermarkDirection::Descending,
911 )
912 .await;
913 }
914
915 #[tokio::test]
916 async fn test_advance_multi_vnode() {
917 test_watermark(vec![(1, 2), (8, 0)], WatermarkDirection::Ascending).await;
918 }
919
920 #[test]
921 fn test_pk_prefix_watermark_decision_stable_after_prior_progress() {
922 let table_1 = TableId::new(1);
923 let table_2 = TableId::new(2);
924 let vnode = VirtualNode::from_index(0);
925 let watermarks = BTreeMap::from_iter([
926 (
927 table_1,
928 ReadTableWatermark {
929 direction: WatermarkDirection::Ascending,
930 vnode_watermarks: BTreeMap::from_iter(once((
931 vnode,
932 Bytes::from_static(b"mid"),
933 ))),
934 },
935 ),
936 (
937 table_2,
938 ReadTableWatermark {
939 direction: WatermarkDirection::Ascending,
940 vnode_watermarks: BTreeMap::from_iter(once((
941 vnode,
942 Bytes::from_static(b"keep"),
943 ))),
944 },
945 ),
946 ]);
947
948 assert_pk_prefix_decision_stable_after_progress(
949 watermarks.clone(),
950 gen_full_key(table_1, 0, "z-after-watermark"),
951 gen_full_key(table_2, 0, "drop-before-watermark"),
952 );
953
954 assert_pk_prefix_decision_stable_after_progress(
955 watermarks,
956 gen_full_key(table_1, 0, "z-after-watermark"),
957 gen_full_key(table_2, 0, "z-after-watermark"),
958 );
959 }
960
961 #[test]
962 fn test_non_pk_prefix_watermark_decision_stable_after_prior_progress() {
963 let table_1 = TableId::new(1);
964 let table_2 = TableId::new(2);
965 let vnode = VirtualNode::from_index(0);
966 let watermark_direction = WatermarkDirection::Ascending;
967 let watermark_col_serde =
968 OrderedRowSerde::new(vec![DataType::Int32], vec![OrderType::ascending()]);
969 let pk_serde = OrderedRowSerde::new(
970 vec![DataType::Int32, DataType::Int32, DataType::Int32],
971 vec![
972 OrderType::ascending(),
973 OrderType::ascending(),
974 OrderType::ascending(),
975 ],
976 );
977 let watermark_col_idx_in_pk = 1;
978 let watermark = serialize_pk(
979 OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]),
980 &watermark_col_serde,
981 );
982
983 let watermarks = BTreeMap::from_iter([table_1, table_2].into_iter().map(|table_id| {
984 (
985 table_id,
986 ReadTableWatermark {
987 direction: watermark_direction,
988 vnode_watermarks: BTreeMap::from_iter(once((vnode, watermark.clone()))),
989 },
990 )
991 }));
992
993 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
994 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor),
995 HashMap::from_iter(
996 [table_1, table_2]
997 .into_iter()
998 .map(|table_id| (table_id, VirtualNode::COUNT_FOR_TEST)),
999 ),
1000 HashMap::from_iter([table_1, table_2].into_iter().map(|table_id| {
1001 (
1002 table_id,
1003 Some((
1004 pk_serde.clone(),
1005 watermark_col_serde.clone(),
1006 watermark_col_idx_in_pk,
1007 )),
1008 )
1009 })),
1010 HashMap::default(),
1011 ));
1012
1013 assert_non_pk_prefix_decision_stable_after_progress(
1014 watermarks.clone(),
1015 compaction_catalog_agent_ref.clone(),
1016 gen_non_pk_full_key(table_1, 0, 1, 8, 1, &pk_serde),
1017 gen_non_pk_full_key(table_2, 0, 1, 3, 1, &pk_serde),
1018 );
1019
1020 assert_non_pk_prefix_decision_stable_after_progress(
1021 watermarks,
1022 compaction_catalog_agent_ref,
1023 gen_non_pk_full_key(table_1, 0, 1, 8, 1, &pk_serde),
1024 gen_non_pk_full_key(table_2, 0, 1, 8, 1, &pk_serde),
1025 );
1026 }
1027
1028 #[tokio::test]
1029 async fn test_non_pk_prefix_watermark() {
1030 fn gen_key_value(
1031 vnode: usize,
1032 col_0: i32,
1033 col_1: i32,
1034 col_2: i32,
1035 col_3: i32,
1036 pk_serde: &OrderedRowSerde,
1037 pk_indices: &[usize],
1038 ) -> (TableKey<Bytes>, SharedBufferValue<Bytes>) {
1039 let r = OwnedRow::new(vec![
1040 Some(ScalarImpl::Int32(col_0)),
1041 Some(ScalarImpl::Int32(col_1)),
1042 Some(ScalarImpl::Int32(col_2)), Some(ScalarImpl::Int32(col_3)),
1044 ]);
1045
1046 let pk = r.project(pk_indices);
1047
1048 let k1 = serialize_pk_with_vnode(pk, pk_serde, VirtualNode::from_index(vnode));
1049 let v1 = SharedBufferValue::Insert(Bytes::copy_from_slice(
1050 format!("{}-value-{}", vnode, col_2).as_bytes(),
1051 ));
1052 (k1, v1)
1053 }
1054
1055 let watermark_direction = WatermarkDirection::Ascending;
1056
1057 let watermark_col_serde =
1058 OrderedRowSerde::new(vec![DataType::Int32], vec![OrderType::ascending()]);
1059 let pk_serde = OrderedRowSerde::new(
1060 vec![DataType::Int32, DataType::Int32, DataType::Int32],
1061 vec![
1062 OrderType::ascending(),
1063 OrderType::ascending(),
1064 OrderType::ascending(),
1065 ],
1066 );
1067
1068 let pk_indices = vec![0, 2, 3];
1069 let watermark_col_idx_in_pk = 1;
1070
1071 {
1072 let shared_buffer_batch = {
1074 let mut kv_pairs = (0..10)
1075 .map(|i| gen_key_value(0, i, 0, i, i, &pk_serde, &pk_indices))
1076 .collect_vec();
1077 kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1078 build_batch(kv_pairs.into_iter(), TABLE_ID)
1079 }
1080 .unwrap();
1081
1082 {
1083 let read_watermark = ReadTableWatermark {
1085 direction: watermark_direction,
1086 vnode_watermarks: BTreeMap::default(),
1087 };
1088
1089 let compaction_catalog_agent_ref = CompactionCatalogAgent::for_test(vec![TABLE_ID]);
1090
1091 let mut iter = NonPkPrefixSkipWatermarkIterator::new(
1092 shared_buffer_batch.clone().into_forward_iter(),
1093 NonPkPrefixSkipWatermarkState::new(
1094 BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
1095 compaction_catalog_agent_ref,
1096 ),
1097 );
1098
1099 iter.rewind().await.unwrap();
1100 assert!(iter.is_valid());
1101 for i in 0..10 {
1102 let (k, _v) = gen_key_value(0, i, 0, i, i, &pk_serde, &pk_indices);
1103 assert_eq!(iter.key().user_key.table_key.as_ref(), k.as_ref());
1104 iter.next().await.unwrap();
1105 }
1106 assert!(!iter.is_valid());
1107 }
1108
1109 {
1110 let watermark = {
1112 let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
1113 serialize_pk(r1, &watermark_col_serde)
1114 };
1115
1116 let read_watermark = ReadTableWatermark {
1117 direction: watermark_direction,
1118 vnode_watermarks: BTreeMap::from_iter(once((
1119 VirtualNode::from_index(0),
1120 watermark.clone(),
1121 ))),
1122 };
1123
1124 let full_key_filter_key_extractor =
1125 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
1126
1127 let table_id_to_vnode =
1128 HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
1129
1130 let table_id_to_watermark_serde = HashMap::from_iter(once((
1131 TABLE_ID,
1132 Some((
1133 pk_serde.clone(),
1134 watermark_col_serde.clone(),
1135 watermark_col_idx_in_pk,
1136 )),
1137 )));
1138
1139 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1140 full_key_filter_key_extractor,
1141 table_id_to_vnode,
1142 table_id_to_watermark_serde,
1143 HashMap::default(),
1144 ));
1145
1146 let mut iter = NonPkPrefixSkipWatermarkIterator::new(
1147 shared_buffer_batch.clone().into_forward_iter(),
1148 NonPkPrefixSkipWatermarkState::new(
1149 BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
1150 compaction_catalog_agent_ref,
1151 ),
1152 );
1153
1154 iter.rewind().await.unwrap();
1155 assert!(iter.is_valid());
1156 for i in 5..10 {
1157 let (k, _v) = gen_key_value(0, i, 0, i, i, &pk_serde, &pk_indices);
1158 assert_eq!(iter.key().user_key.table_key.as_ref(), k.as_ref());
1159 iter.next().await.unwrap();
1160 }
1161 assert!(!iter.is_valid());
1162 }
1163 }
1164
1165 {
1166 let shared_buffer_batch = {
1168 let mut kv_pairs = (0..10_i32)
1169 .map(|i| gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices))
1170 .collect_vec();
1171
1172 kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1173 build_batch(kv_pairs.into_iter(), TABLE_ID)
1174 };
1175
1176 {
1177 let watermark = {
1179 let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
1180 serialize_pk(r1, &watermark_col_serde)
1181 };
1182
1183 let read_watermark = ReadTableWatermark {
1184 direction: watermark_direction,
1185 vnode_watermarks: BTreeMap::from_iter(
1186 (0..2).map(|i| (VirtualNode::from_index(i), watermark.clone())),
1187 ),
1188 };
1189
1190 let full_key_filter_key_extractor =
1191 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
1192
1193 let table_id_to_vnode =
1194 HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
1195
1196 let table_id_to_watermark_serde = HashMap::from_iter(once((
1197 TABLE_ID,
1198 Some((
1199 pk_serde.clone(),
1200 watermark_col_serde.clone(),
1201 watermark_col_idx_in_pk,
1202 )),
1203 )));
1204
1205 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1206 full_key_filter_key_extractor,
1207 table_id_to_vnode,
1208 table_id_to_watermark_serde,
1209 HashMap::default(),
1210 ));
1211
1212 let mut iter = NonPkPrefixSkipWatermarkIterator::new(
1213 shared_buffer_batch.clone().unwrap().into_forward_iter(),
1214 NonPkPrefixSkipWatermarkState::new(
1215 BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
1216 compaction_catalog_agent_ref,
1217 ),
1218 );
1219
1220 iter.rewind().await.unwrap();
1221 assert!(iter.is_valid());
1222 let mut kv_pairs = (5..10_i32)
1223 .map(|i| {
1224 let (k, v) =
1225 gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices);
1226 (k, v)
1227 })
1228 .collect_vec();
1229 kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1230 let mut index = 0;
1231 while iter.is_valid() {
1232 assert!(kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
1233 iter.next().await.unwrap();
1234 index += 1;
1235 }
1236 }
1237
1238 {
1239 let watermark = {
1241 let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
1242 serialize_pk(r1, &watermark_col_serde)
1243 };
1244
1245 let read_watermark = ReadTableWatermark {
1246 direction: watermark_direction,
1247 vnode_watermarks: BTreeMap::from_iter(
1248 (0..2).map(|i| (VirtualNode::from_index(i), watermark.clone())),
1249 ),
1250 };
1251
1252 let full_key_filter_key_extractor =
1253 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
1254
1255 let table_id_to_vnode =
1256 HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
1257
1258 let table_id_to_watermark_serde = HashMap::from_iter(once((
1259 TABLE_ID,
1260 Some((
1261 pk_serde.clone(),
1262 watermark_col_serde.clone(),
1263 watermark_col_idx_in_pk,
1264 )),
1265 )));
1266
1267 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1268 full_key_filter_key_extractor,
1269 table_id_to_vnode,
1270 table_id_to_watermark_serde,
1271 HashMap::default(),
1272 ));
1273
1274 let mut iter = NonPkPrefixSkipWatermarkIterator::new(
1275 shared_buffer_batch.clone().unwrap().into_forward_iter(),
1276 NonPkPrefixSkipWatermarkState::new(
1277 BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
1278 compaction_catalog_agent_ref,
1279 ),
1280 );
1281
1282 iter.rewind().await.unwrap();
1283 assert!(iter.is_valid());
1284 let mut kv_pairs = (5..10_i32)
1285 .map(|i| {
1286 let (k, v) =
1287 gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices);
1288 (k, v)
1289 })
1290 .collect_vec();
1291 kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1292 let mut index = 0;
1293 while iter.is_valid() {
1294 assert!(kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
1295 iter.next().await.unwrap();
1296 index += 1;
1297 }
1298 }
1299
1300 {
1301 let watermark = {
1303 let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
1304 serialize_pk(r1, &watermark_col_serde)
1305 };
1306
1307 let read_watermark = ReadTableWatermark {
1308 direction: WatermarkDirection::Descending,
1309 vnode_watermarks: BTreeMap::from_iter(
1310 (0..2).map(|i| (VirtualNode::from_index(i), watermark.clone())),
1311 ),
1312 };
1313
1314 let full_key_filter_key_extractor =
1315 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
1316
1317 let table_id_to_vnode =
1318 HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
1319
1320 let table_id_to_watermark_serde = HashMap::from_iter(once((
1321 TABLE_ID,
1322 Some((
1323 pk_serde.clone(),
1324 watermark_col_serde.clone(),
1325 watermark_col_idx_in_pk,
1326 )),
1327 )));
1328
1329 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1330 full_key_filter_key_extractor,
1331 table_id_to_vnode,
1332 table_id_to_watermark_serde,
1333 HashMap::default(),
1334 ));
1335
1336 let mut iter = NonPkPrefixSkipWatermarkIterator::new(
1337 shared_buffer_batch.clone().unwrap().into_forward_iter(),
1338 NonPkPrefixSkipWatermarkState::new(
1339 BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
1340 compaction_catalog_agent_ref,
1341 ),
1342 );
1343
1344 iter.rewind().await.unwrap();
1345 assert!(iter.is_valid());
1346 let mut kv_pairs = (0..=5_i32)
1347 .map(|i| {
1348 let (k, v) =
1349 gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices);
1350 (k, v)
1351 })
1352 .collect_vec();
1353 kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1354 let mut index = 0;
1355 while iter.is_valid() {
1356 assert!(kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
1357 iter.next().await.unwrap();
1358 index += 1;
1359 }
1360 }
1361 }
1362 }
1363
1364 #[tokio::test]
1365 async fn test_mix_watermark() {
1366 fn gen_key_value(
1367 vnode: usize,
1368 col_0: i32,
1369 col_1: i32,
1370 col_2: i32,
1371 col_3: i32,
1372 pk_serde: &OrderedRowSerde,
1373 pk_indices: &[usize],
1374 ) -> (TableKey<Bytes>, SharedBufferValue<Bytes>) {
1375 let r = OwnedRow::new(vec![
1376 Some(ScalarImpl::Int32(col_0)),
1377 Some(ScalarImpl::Int32(col_1)),
1378 Some(ScalarImpl::Int32(col_2)), Some(ScalarImpl::Int32(col_3)),
1380 ]);
1381
1382 let pk = r.project(pk_indices);
1383
1384 let k1 = serialize_pk_with_vnode(pk, pk_serde, VirtualNode::from_index(vnode));
1385 let v1 = SharedBufferValue::Insert(Bytes::copy_from_slice(
1386 format!("{}-value-{}-{}-{}-{}", vnode, col_0, col_1, col_2, col_3).as_bytes(),
1387 ));
1388
1389 (k1, v1)
1390 }
1391
1392 let watermark_col_serde =
1393 OrderedRowSerde::new(vec![DataType::Int32], vec![OrderType::ascending()]);
1394 let t1_pk_serde = OrderedRowSerde::new(
1395 vec![DataType::Int32, DataType::Int32, DataType::Int32],
1396 vec![
1397 OrderType::ascending(),
1398 OrderType::ascending(),
1399 OrderType::ascending(),
1400 ],
1401 );
1402
1403 let t1_pk_indices = vec![0, 2, 3];
1404 let t1_watermark_col_idx_in_pk = 1;
1405
1406 let t2_pk_indices = vec![0, 1, 2];
1407
1408 let t2_pk_serde = OrderedRowSerde::new(
1409 vec![DataType::Int32, DataType::Int32, DataType::Int32],
1410 vec![
1411 OrderType::ascending(),
1412 OrderType::ascending(),
1413 OrderType::ascending(),
1414 ],
1415 );
1416
1417 let t1_id = TABLE_ID;
1418 let t2_id = TableId::from(t1_id.as_raw_id() + 1);
1419
1420 let t1_shared_buffer_batch = {
1421 let mut kv_pairs = (0..10_i32)
1422 .map(|i| {
1423 gen_key_value(
1424 i as usize % 2,
1425 10 - i,
1426 0,
1427 i,
1428 i,
1429 &t1_pk_serde,
1430 &t1_pk_indices,
1431 )
1432 })
1433 .collect_vec();
1434
1435 kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1436 build_batch(kv_pairs.into_iter(), t1_id).unwrap()
1437 };
1438
1439 let t2_shared_buffer_batch = {
1440 let mut kv_pairs = (0..10_i32)
1441 .map(|i| {
1442 gen_key_value(
1443 i as usize % 2,
1444 10 - i,
1445 0,
1446 0,
1447 0,
1448 &t2_pk_serde,
1449 &t2_pk_indices,
1450 )
1451 })
1452 .collect_vec();
1453
1454 kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1455 build_batch(kv_pairs.into_iter(), t2_id).unwrap()
1456 };
1457
1458 let t1_watermark = {
1459 let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
1460 serialize_pk(r1, &watermark_col_serde)
1461 };
1462
1463 let t1_read_watermark = ReadTableWatermark {
1464 direction: WatermarkDirection::Ascending,
1465 vnode_watermarks: BTreeMap::from_iter(
1466 (0..2).map(|i| (VirtualNode::from_index(i), t1_watermark.clone())),
1467 ),
1468 };
1469
1470 let t2_watermark = {
1471 let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
1472 serialize_pk(r1, &watermark_col_serde)
1473 };
1474
1475 let t2_read_watermark = ReadTableWatermark {
1476 direction: WatermarkDirection::Ascending,
1477 vnode_watermarks: BTreeMap::from_iter(
1478 (0..2).map(|i| (VirtualNode::from_index(i), t2_watermark.clone())),
1479 ),
1480 };
1481
1482 {
1483 let t1_iter = t1_shared_buffer_batch.clone().into_forward_iter();
1485 let t2_iter = t2_shared_buffer_batch.clone().into_forward_iter();
1486 let iter_vec = vec![t1_iter, t2_iter];
1487 let merge_iter = MergeIterator::new(iter_vec);
1488
1489 let full_key_filter_key_extractor =
1490 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
1491
1492 let table_id_to_vnode =
1493 HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
1494
1495 let table_id_to_watermark_serde = HashMap::from_iter(once((
1496 t1_id,
1497 Some((
1498 t1_pk_serde.clone(),
1499 watermark_col_serde.clone(),
1500 t1_watermark_col_idx_in_pk,
1501 )),
1502 )));
1503
1504 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1505 full_key_filter_key_extractor,
1506 table_id_to_vnode,
1507 table_id_to_watermark_serde,
1508 HashMap::default(),
1509 ));
1510
1511 let mut iter = NonPkPrefixSkipWatermarkIterator::new(
1512 merge_iter,
1513 NonPkPrefixSkipWatermarkState::new(
1514 BTreeMap::from_iter(once((TABLE_ID, t1_read_watermark.clone()))),
1515 compaction_catalog_agent_ref,
1516 ),
1517 );
1518
1519 iter.rewind().await.unwrap();
1520 assert!(iter.is_valid());
1521 let mut t1_kv_pairs = (5..10_i32)
1522 .map(|i| {
1523 let (k, v) = gen_key_value(
1524 i as usize % 2,
1525 10 - i,
1526 0,
1527 i,
1528 i,
1529 &t1_pk_serde,
1530 &t1_pk_indices,
1531 );
1532 (k, v)
1533 })
1534 .collect_vec();
1535
1536 t1_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1537
1538 let mut t2_kv_pairs = (0..10_i32)
1539 .map(|i| {
1540 gen_key_value(
1541 i as usize % 2,
1542 10 - i,
1543 0,
1544 0,
1545 0,
1546 &t2_pk_serde,
1547 &t2_pk_indices,
1548 )
1549 })
1550 .collect_vec();
1551
1552 t2_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1553 let mut index = 0;
1554 for _ in 0..t1_kv_pairs.len() {
1555 assert!(t1_kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
1556 iter.next().await.unwrap();
1557 index += 1;
1558 }
1559
1560 assert!(iter.is_valid());
1561 assert_eq!(t1_kv_pairs.len(), index);
1562
1563 index = 0;
1564 for _ in 0..t2_kv_pairs.len() {
1565 assert!(t2_kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
1566 iter.next().await.unwrap();
1567 index += 1;
1568 }
1569
1570 assert!(!iter.is_valid());
1571 assert_eq!(t2_kv_pairs.len(), index);
1572 }
1573
1574 {
1575 let t1_iter = t1_shared_buffer_batch.clone().into_forward_iter();
1576 let t2_iter = t2_shared_buffer_batch.clone().into_forward_iter();
1577 let iter_vec = vec![t1_iter, t2_iter];
1578 let merge_iter = MergeIterator::new(iter_vec);
1579
1580 let full_key_filter_key_extractor =
1581 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
1582
1583 let table_id_to_vnode = HashMap::from_iter(
1584 vec![
1585 (t1_id, VirtualNode::COUNT_FOR_TEST),
1586 (t2_id, VirtualNode::COUNT_FOR_TEST),
1587 ]
1588 .into_iter(),
1589 );
1590
1591 let table_id_to_watermark_serde = HashMap::from_iter(
1592 vec![
1593 (
1594 t1_id,
1595 Some((
1596 t1_pk_serde.clone(),
1597 watermark_col_serde.clone(),
1598 t1_watermark_col_idx_in_pk,
1599 )),
1600 ),
1601 (t2_id, None),
1602 ]
1603 .into_iter(),
1604 );
1605
1606 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1607 full_key_filter_key_extractor,
1608 table_id_to_vnode,
1609 table_id_to_watermark_serde,
1610 HashMap::default(),
1611 ));
1612
1613 let non_pk_prefix_iter = NonPkPrefixSkipWatermarkIterator::new(
1614 merge_iter,
1615 NonPkPrefixSkipWatermarkState::new(
1616 BTreeMap::from_iter(once((t1_id, t1_read_watermark.clone()))),
1617 compaction_catalog_agent_ref.clone(),
1618 ),
1619 );
1620
1621 let mut mix_iter = PkPrefixSkipWatermarkIterator::new(
1622 non_pk_prefix_iter,
1623 PkPrefixSkipWatermarkState::new(BTreeMap::from_iter(once((
1624 t2_id,
1625 t2_read_watermark.clone(),
1626 )))),
1627 );
1628
1629 mix_iter.rewind().await.unwrap();
1630 assert!(mix_iter.is_valid());
1631
1632 let mut t1_kv_pairs = (5..10_i32)
1633 .map(|i| {
1634 let (k, v) = gen_key_value(
1635 i as usize % 2,
1636 10 - i,
1637 0,
1638 i,
1639 i,
1640 &t1_pk_serde,
1641 &t1_pk_indices,
1642 );
1643 (k, v)
1644 })
1645 .collect_vec();
1646
1647 t1_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1648
1649 let mut t2_kv_pairs = (0..=5_i32)
1650 .map(|i| {
1651 gen_key_value(
1652 i as usize % 2,
1653 10 - i,
1654 0,
1655 0,
1656 0,
1657 &t2_pk_serde,
1658 &t2_pk_indices,
1659 )
1660 })
1661 .collect_vec();
1662
1663 t2_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1664
1665 let mut index = 0;
1666 for _ in 0..t1_kv_pairs.len() {
1667 assert!(
1668 t1_kv_pairs[index].0.as_ref() == mix_iter.key().user_key.table_key.as_ref()
1669 );
1670 mix_iter.next().await.unwrap();
1671 index += 1;
1672 }
1673
1674 assert!(mix_iter.is_valid());
1675 assert_eq!(t1_kv_pairs.len(), index);
1676
1677 index = 0;
1678
1679 for _ in 0..t2_kv_pairs.len() {
1680 assert!(
1681 t2_kv_pairs[index].0.as_ref() == mix_iter.key().user_key.table_key.as_ref()
1682 );
1683 mix_iter.next().await.unwrap();
1684 index += 1;
1685 }
1686
1687 assert!(!mix_iter.is_valid());
1688 assert_eq!(t2_kv_pairs.len(), index);
1689 }
1690 }
1691}