risingwave_storage/hummock/iterator/
skip_watermark.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::{BTreeMap, VecDeque};
17
18use bytes::Bytes;
19use risingwave_common::catalog::TableId;
20use risingwave_common::hash::VirtualNode;
21use risingwave_common::row::Row;
22use risingwave_common::types::Datum;
23use risingwave_common::util::row_serde::OrderedRowSerde;
24use risingwave_hummock_sdk::compaction_group::hummock_version_ext::safe_epoch_read_table_watermarks_impl;
25use risingwave_hummock_sdk::key::FullKey;
26use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap, add_table_stats_map};
27use risingwave_hummock_sdk::table_watermark::{
28    ReadTableWatermark, TableWatermarks, WatermarkDirection,
29};
30
31use super::SkipWatermarkState;
32use crate::compaction_catalog_manager::{CompactionCatalogAgentRef, ValueWatermarkColumnSerdeRef};
33use crate::hummock::HummockResult;
34use crate::hummock::iterator::{Forward, HummockIterator, ValueMeta};
35use crate::hummock::value::HummockValue;
36use crate::monitor::StoreLocalStatistic;
37
38pub struct SkipWatermarkIterator<I, S> {
39    inner: I,
40    state: S,
41    /// The stats of skipped key-value pairs for each table.
42    skipped_entry_table_stats: TableStatsMap,
43    /// The id of table currently undergoing processing.
44    last_table_id: Option<TableId>,
45    /// The stats of table currently undergoing processing.
46    last_table_stats: TableStats,
47}
48
49impl<I: HummockIterator<Direction = Forward>, S: SkipWatermarkState> SkipWatermarkIterator<I, S> {
50    pub fn new(inner: I, state: S) -> Self {
51        Self {
52            inner,
53            state,
54            skipped_entry_table_stats: TableStatsMap::default(),
55            last_table_id: None,
56            last_table_stats: TableStats::default(),
57        }
58    }
59
60    fn reset_watermark(&mut self) {
61        self.state.reset_watermark();
62    }
63
64    fn reset_skipped_entry_table_stats(&mut self) {
65        self.skipped_entry_table_stats = TableStatsMap::default();
66        self.last_table_id = None;
67        self.last_table_stats = TableStats::default();
68    }
69
70    /// Advance the key until iterator invalid or the current key will not be filtered by the latest watermark.
71    /// Calling this method should ensure that the first remaining watermark has been advanced to the current key.
72    ///
73    /// Return a flag indicating whether should later advance the watermark.
74    async fn advance_key_and_watermark(&mut self) -> HummockResult<()> {
75        // advance key and watermark in an interleave manner until nothing
76        // changed after the method is called.
77        while self.inner.is_valid() {
78            if !self
79                .state
80                .should_delete(&self.inner.key(), self.inner.value())
81            {
82                break;
83            }
84
85            if self
86                .last_table_id
87                .is_none_or(|last_table_id| last_table_id != self.inner.key().user_key.table_id)
88            {
89                self.add_last_table_stats();
90                self.last_table_id = Some(self.inner.key().user_key.table_id);
91            }
92            self.last_table_stats.total_key_count -= 1;
93            self.last_table_stats.total_key_size -= self.inner.key().encoded_len() as i64;
94            self.last_table_stats.total_value_size -= self.inner.value().encoded_len() as i64;
95
96            self.inner.next().await?;
97        }
98        self.add_last_table_stats();
99        Ok(())
100    }
101
102    fn add_last_table_stats(&mut self) {
103        let Some(last_table_id) = self.last_table_id.take() else {
104            return;
105        };
106        let delta = std::mem::take(&mut self.last_table_stats);
107        let e = self
108            .skipped_entry_table_stats
109            .entry(last_table_id)
110            .or_default();
111        e.total_key_count += delta.total_key_count;
112        e.total_key_size += delta.total_key_size;
113        e.total_value_size += delta.total_value_size;
114    }
115}
116
117impl<I: HummockIterator<Direction = Forward>, S: SkipWatermarkState> HummockIterator
118    for SkipWatermarkIterator<I, S>
119{
120    type Direction = Forward;
121
122    async fn next(&mut self) -> HummockResult<()> {
123        self.inner.next().await?;
124        // Check whether there is any remaining watermark and return early to
125        // avoid calling the async `advance_key_and_watermark`, since in benchmark
126        // performance downgrade is observed without this early return.
127        if self.state.has_watermark() {
128            self.advance_key_and_watermark().await?;
129        }
130        Ok(())
131    }
132
133    fn key(&self) -> FullKey<&[u8]> {
134        self.inner.key()
135    }
136
137    fn value(&self) -> HummockValue<&[u8]> {
138        self.inner.value()
139    }
140
141    fn is_valid(&self) -> bool {
142        self.inner.is_valid()
143    }
144
145    async fn rewind(&mut self) -> HummockResult<()> {
146        self.reset_watermark();
147        self.reset_skipped_entry_table_stats();
148        self.inner.rewind().await?;
149        self.advance_key_and_watermark().await?;
150        Ok(())
151    }
152
153    async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
154        self.reset_watermark();
155        self.reset_skipped_entry_table_stats();
156        self.inner.seek(key).await?;
157        self.advance_key_and_watermark().await?;
158        Ok(())
159    }
160
161    fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
162        add_table_stats_map(
163            &mut stats.skipped_by_watermark_table_stats,
164            &self.skipped_entry_table_stats,
165        );
166        self.inner.collect_local_statistic(stats)
167    }
168
169    fn value_meta(&self) -> ValueMeta {
170        self.inner.value_meta()
171    }
172}
173pub struct PkPrefixSkipWatermarkState {
174    watermarks: BTreeMap<TableId, ReadTableWatermark>,
175    remain_watermarks: VecDeque<(TableId, VirtualNode, WatermarkDirection, Bytes)>,
176}
177
178impl SkipWatermarkState for PkPrefixSkipWatermarkState {
179    #[inline(always)]
180    fn has_watermark(&self) -> bool {
181        !self.remain_watermarks.is_empty()
182    }
183
184    fn should_delete(&mut self, key: &FullKey<&[u8]>, value: HummockValue<&[u8]>) -> bool {
185        if let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
186            let key_table_id = key.user_key.table_id;
187            let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
188            match (&key_table_id, &key_vnode).cmp(&(table_id, vnode)) {
189                Ordering::Less => {
190                    return false;
191                }
192                Ordering::Equal => {
193                    return direction.key_filter_by_watermark(inner_key, watermark);
194                }
195                Ordering::Greater => {
196                    // The current key has advanced over the watermark.
197                    // We may advance the watermark before advancing the key.
198                    return self.advance_watermark(key, value);
199                }
200            }
201        }
202        false
203    }
204
205    fn reset_watermark(&mut self) {
206        self.remain_watermarks = self
207            .watermarks
208            .iter()
209            .flat_map(|(table_id, read_watermarks)| {
210                read_watermarks
211                    .vnode_watermarks
212                    .iter()
213                    .map(|(vnode, watermarks)| {
214                        (
215                            *table_id,
216                            *vnode,
217                            read_watermarks.direction,
218                            watermarks.clone(),
219                        )
220                    })
221            })
222            .collect();
223    }
224
225    /// Advance watermark until no watermark remains or the first watermark can possibly
226    /// filter out the current or future key.
227    ///
228    /// Return a flag indicating whether the current key will be filtered by the current watermark.
229    fn advance_watermark(&mut self, key: &FullKey<&[u8]>, _value: HummockValue<&[u8]>) -> bool {
230        let key_table_id = key.user_key.table_id;
231        let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
232        while let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
233            match (table_id, vnode).cmp(&(&key_table_id, &key_vnode)) {
234                Ordering::Less => {
235                    self.remain_watermarks.pop_front();
236                    continue;
237                }
238                Ordering::Equal => {
239                    match direction {
240                        WatermarkDirection::Ascending => {
241                            match inner_key.cmp(watermark.as_ref()) {
242                                Ordering::Less => {
243                                    // The current key will be filtered by the watermark.
244                                    // Return true to further advance the key.
245                                    return true;
246                                }
247                                Ordering::Equal | Ordering::Greater => {
248                                    // The current key has passed the watermark.
249                                    // Advance the next watermark.
250                                    self.remain_watermarks.pop_front();
251                                    // Since it is impossible for a (table_id, vnode) tuple to have multiple
252                                    // watermark, after the pop_front, the next (table_id, vnode) must have
253                                    // exceeded the current key, and we can directly return and mark that the
254                                    // current key is not filtered by the watermark at the front.
255                                    #[cfg(debug_assertions)]
256                                    {
257                                        if let Some((next_table_id, next_vnode, _, _)) =
258                                            self.remain_watermarks.front()
259                                        {
260                                            assert!(
261                                                (next_table_id, next_vnode)
262                                                    > (&key_table_id, &key_vnode)
263                                            );
264                                        }
265                                    }
266                                    return false;
267                                }
268                            }
269                        }
270                        WatermarkDirection::Descending => {
271                            return match inner_key.cmp(watermark.as_ref()) {
272                                // Current key as not reached the watermark. Just return.
273                                Ordering::Less | Ordering::Equal => false,
274                                // Current key will be filtered by the watermark.
275                                // Return true to further advance the key.
276                                Ordering::Greater => true,
277                            };
278                        }
279                    }
280                }
281                Ordering::Greater => {
282                    return false;
283                }
284            }
285        }
286        false
287    }
288}
289
290impl PkPrefixSkipWatermarkState {
291    pub fn new(watermarks: BTreeMap<TableId, ReadTableWatermark>) -> Self {
292        Self {
293            remain_watermarks: VecDeque::new(),
294            watermarks,
295        }
296    }
297
298    pub fn from_safe_epoch_watermarks(
299        safe_epoch_watermarks: BTreeMap<TableId, TableWatermarks>,
300    ) -> Self {
301        let watermarks = safe_epoch_read_table_watermarks_impl(safe_epoch_watermarks);
302        Self::new(watermarks)
303    }
304}
305
306pub struct NonPkPrefixSkipWatermarkState {
307    watermarks: BTreeMap<TableId, ReadTableWatermark>,
308    remain_watermarks: VecDeque<(TableId, VirtualNode, WatermarkDirection, Datum)>,
309    compaction_catalog_agent_ref: CompactionCatalogAgentRef,
310
311    last_serde: Option<(OrderedRowSerde, OrderedRowSerde, usize)>,
312    last_table_id: Option<TableId>,
313}
314
315impl NonPkPrefixSkipWatermarkState {
316    pub fn new(
317        watermarks: BTreeMap<TableId, ReadTableWatermark>,
318        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
319    ) -> Self {
320        Self {
321            remain_watermarks: VecDeque::new(),
322            watermarks,
323            compaction_catalog_agent_ref,
324            last_serde: None,
325            last_table_id: None,
326        }
327    }
328
329    pub fn from_safe_epoch_watermarks(
330        safe_epoch_watermarks: BTreeMap<TableId, TableWatermarks>,
331        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
332    ) -> Self {
333        let watermarks = safe_epoch_read_table_watermarks_impl(safe_epoch_watermarks);
334        Self::new(watermarks, compaction_catalog_agent_ref)
335    }
336}
337
338impl SkipWatermarkState for NonPkPrefixSkipWatermarkState {
339    #[inline(always)]
340    fn has_watermark(&self) -> bool {
341        !self.remain_watermarks.is_empty()
342    }
343
344    fn should_delete(&mut self, key: &FullKey<&[u8]>, value: HummockValue<&[u8]>) -> bool {
345        if let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
346            let key_table_id = key.user_key.table_id;
347            {
348                if self
349                    .last_table_id
350                    .is_none_or(|last_table_id| last_table_id != key_table_id)
351                {
352                    self.last_table_id = Some(key_table_id);
353                    self.last_serde = self.compaction_catalog_agent_ref.watermark_serde(*table_id);
354                }
355            }
356
357            let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
358            match (&key_table_id, &key_vnode).cmp(&(table_id, vnode)) {
359                Ordering::Less => {
360                    return false;
361                }
362                Ordering::Equal => {
363                    let (pk_prefix_serde, watermark_col_serde, watermark_col_idx_in_pk) =
364                        self.last_serde.as_ref().unwrap();
365                    let row = pk_prefix_serde
366                        .deserialize(inner_key)
367                        .unwrap_or_else(|_| {
368                            panic!("Failed to deserialize pk_prefix inner_key {:?} serde data_types {:?} order_types {:?}", inner_key, pk_prefix_serde.get_data_types(), pk_prefix_serde.get_order_types());
369                        });
370                    let watermark_col_in_pk = row.datum_at(*watermark_col_idx_in_pk);
371                    return direction.datum_filter_by_watermark(
372                        watermark_col_in_pk,
373                        watermark,
374                        watermark_col_serde.get_order_types()[0],
375                    );
376                }
377                Ordering::Greater => {
378                    // The current key has advanced over the watermark.
379                    // We may advance the watermark before advancing the key.
380                    return self.advance_watermark(key, value);
381                }
382            }
383        }
384        false
385    }
386
387    fn reset_watermark(&mut self) {
388        self.remain_watermarks = self
389            .watermarks
390            .iter()
391            .flat_map(|(table_id, read_watermarks)| {
392                let watermark_serde = self.compaction_catalog_agent_ref.watermark_serde(*table_id).map(|(_pk_serde, watermark_serde, _watermark_col_idx_in_pk)| watermark_serde);
393
394                read_watermarks
395                    .vnode_watermarks
396                    .iter()
397                    .flat_map(move |(vnode, watermarks)| {
398                        // TODO(ttl): if the watermark column is in the value, we may get a `None` here, support it.
399                        let watermark_serde = watermark_serde.as_ref()?;
400                        Some((
401                            *table_id,
402                            *vnode,
403                            read_watermarks.direction,
404                            {
405                                let row = watermark_serde
406                                .deserialize(watermarks).unwrap_or_else(|_| {
407                                    panic!("Failed to deserialize watermark {:?} serde data_types {:?} order_types {:?}", watermarks, watermark_serde.get_data_types(), watermark_serde.get_order_types());
408                                });
409                                row[0].clone()
410                            },
411                        ))
412                    })
413            })
414            .collect();
415    }
416
417    fn advance_watermark(&mut self, key: &FullKey<&[u8]>, _value: HummockValue<&[u8]>) -> bool {
418        let key_table_id = key.user_key.table_id;
419        let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
420        while let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
421            match (table_id, vnode).cmp(&(&key_table_id, &key_vnode)) {
422                Ordering::Less => {
423                    self.remain_watermarks.pop_front();
424                    continue;
425                }
426                Ordering::Equal => {
427                    let (pk_prefix_serde, watermark_col_serde, watermark_col_idx_in_pk) =
428                        self.last_serde.as_ref().unwrap();
429
430                    let row = pk_prefix_serde
431                        .deserialize(inner_key)
432                        .unwrap_or_else(|_| {
433                            panic!("Failed to deserialize pk_prefix inner_key {:?} serde data_types {:?} order_types {:?}", inner_key, pk_prefix_serde.get_data_types(), pk_prefix_serde.get_order_types());
434                        });
435                    let watermark_col_in_pk = row.datum_at(*watermark_col_idx_in_pk);
436
437                    return direction.datum_filter_by_watermark(
438                        watermark_col_in_pk,
439                        watermark,
440                        watermark_col_serde.get_order_types()[0],
441                    );
442                }
443                Ordering::Greater => {
444                    return false;
445                }
446            }
447        }
448        false
449    }
450}
451
452pub type PkPrefixSkipWatermarkIterator<I> = SkipWatermarkIterator<I, PkPrefixSkipWatermarkState>;
453
454pub type NonPkPrefixSkipWatermarkIterator<I> =
455    SkipWatermarkIterator<I, NonPkPrefixSkipWatermarkState>;
456
457pub type ValueSkipWatermarkIterator<I> = SkipWatermarkIterator<I, ValueSkipWatermarkState>;
458
459pub struct ValueSkipWatermarkState {
460    pub watermarks: BTreeMap<TableId, ReadTableWatermark>,
461    remain_watermarks: VecDeque<(TableId, VirtualNode, WatermarkDirection, Bytes)>,
462    compaction_catalog_agent_ref: CompactionCatalogAgentRef,
463    last_serde: Option<ValueWatermarkColumnSerdeRef>,
464    last_table_id: Option<TableId>,
465}
466
467impl ValueSkipWatermarkState {
468    pub fn new(
469        watermarks: BTreeMap<TableId, ReadTableWatermark>,
470        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
471    ) -> Self {
472        Self {
473            remain_watermarks: VecDeque::new(),
474            watermarks,
475            compaction_catalog_agent_ref,
476            last_serde: None,
477            last_table_id: None,
478        }
479    }
480
481    pub fn from_safe_epoch_watermarks(
482        safe_epoch_watermarks: BTreeMap<TableId, TableWatermarks>,
483        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
484    ) -> Self {
485        let watermarks = safe_epoch_read_table_watermarks_impl(safe_epoch_watermarks);
486        Self::new(watermarks, compaction_catalog_agent_ref)
487    }
488
489    pub fn may_delete(&self, key: &FullKey<&[u8]>) -> bool {
490        let table_id = key.user_key.table_id;
491        self.watermarks.contains_key(&table_id)
492    }
493}
494
495impl SkipWatermarkState for ValueSkipWatermarkState {
496    #[inline(always)]
497    fn has_watermark(&self) -> bool {
498        !self.remain_watermarks.is_empty()
499    }
500
501    fn should_delete(&mut self, key: &FullKey<&[u8]>, value: HummockValue<&[u8]>) -> bool {
502        if let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
503            let key_table_id = key.user_key.table_id;
504            let key_vnode = key.user_key.table_key.vnode_part();
505            if self
506                .last_table_id
507                .is_none_or(|last_table_id| last_table_id != key_table_id)
508            {
509                self.last_table_id = Some(key_table_id);
510                self.last_serde = self
511                    .compaction_catalog_agent_ref
512                    .value_watermark_serde(*table_id);
513            }
514            match (&key_table_id, &key_vnode).cmp(&(table_id, vnode)) {
515                Ordering::Less => {
516                    return false;
517                }
518                Ordering::Equal => {
519                    let HummockValue::Put(value) = value else {
520                        return false;
521                    };
522                    let Some(ref last_serde) = self.last_serde else {
523                        return false;
524                    };
525                    let Ok(watermark_column_value) = last_serde.deserialize(value) else {
526                        tracing::error!(
527                            ?table_id,
528                            ?vnode,
529                            ?value,
530                            "Failed to deserialize watermark column"
531                        );
532                        return false;
533                    };
534                    let Some(watermark_column_value) = watermark_column_value else {
535                        tracing::debug!(
536                            ?table_id,
537                            ?vnode,
538                            ?value,
539                            "The specified watermark column was not found in the value, likely due to the use of a non-current catalog. Restarting the compactor should resolve this issue."
540                        );
541                        return false;
542                    };
543                    return direction.key_filter_by_watermark(&watermark_column_value, watermark);
544                }
545                Ordering::Greater => {
546                    // The current key has advanced over the watermark.
547                    // We may advance the watermark before advancing the key.
548                    return self.advance_watermark(key, value);
549                }
550            }
551        }
552        false
553    }
554
555    fn reset_watermark(&mut self) {
556        self.remain_watermarks = self
557            .watermarks
558            .iter()
559            .flat_map(|(table_id, read_watermarks)| {
560                read_watermarks
561                    .vnode_watermarks
562                    .iter()
563                    .map(|(vnode, watermarks)| {
564                        (
565                            *table_id,
566                            *vnode,
567                            read_watermarks.direction,
568                            watermarks.clone(),
569                        )
570                    })
571            })
572            .collect();
573    }
574
575    fn advance_watermark(&mut self, key: &FullKey<&[u8]>, value: HummockValue<&[u8]>) -> bool {
576        let key_table_id = key.user_key.table_id;
577        let key_vnode = key.user_key.table_key.vnode_part();
578        while let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
579            match (table_id, vnode).cmp(&(&key_table_id, &key_vnode)) {
580                Ordering::Less => {
581                    self.remain_watermarks.pop_front();
582                    continue;
583                }
584                Ordering::Equal => {
585                    let HummockValue::Put(value) = value else {
586                        return false;
587                    };
588                    let Some(ref last_serde) = self.last_serde else {
589                        return false;
590                    };
591                    let Ok(watermark_column_value) = last_serde.deserialize(value) else {
592                        tracing::error!(
593                            ?table_id,
594                            ?vnode,
595                            ?value,
596                            "Failed to deserialize watermark column."
597                        );
598                        return false;
599                    };
600                    let Some(watermark_column_value) = watermark_column_value else {
601                        tracing::warn!(
602                            ?table_id,
603                            ?vnode,
604                            ?value,
605                            "The specified watermark column was not found in the value, likely due to the use of a non-current catalog. Restarting the compactor should resolve this issue."
606                        );
607                        return false;
608                    };
609                    return direction.key_filter_by_watermark(&watermark_column_value, watermark);
610                }
611                Ordering::Greater => {
612                    return false;
613                }
614            }
615        }
616        false
617    }
618}
619
620#[cfg(test)]
621mod tests {
622    use std::collections::{BTreeMap, HashMap};
623    use std::iter::{empty, once};
624    use std::sync::Arc;
625
626    use bytes::Bytes;
627    use itertools::Itertools;
628    use risingwave_common::catalog::TableId;
629    use risingwave_common::hash::VirtualNode;
630    use risingwave_common::row::{OwnedRow, RowExt};
631    use risingwave_common::types::{DataType, ScalarImpl};
632    use risingwave_common::util::epoch::test_epoch;
633    use risingwave_common::util::row_serde::OrderedRowSerde;
634    use risingwave_common::util::sort_util::OrderType;
635    use risingwave_hummock_sdk::EpochWithGap;
636    use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey, gen_key_from_str};
637    use risingwave_hummock_sdk::table_watermark::{ReadTableWatermark, WatermarkDirection};
638
639    use super::PkPrefixSkipWatermarkState;
640    use crate::compaction_catalog_manager::{
641        CompactionCatalogAgent, FilterKeyExtractorImpl, FullKeyFilterKeyExtractor,
642    };
643    use crate::hummock::iterator::{
644        HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
645        NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator,
646    };
647    use crate::hummock::shared_buffer::shared_buffer_batch::{
648        SharedBufferBatch, SharedBufferValue,
649    };
650    use crate::row_serde::row_serde_util::{serialize_pk, serialize_pk_with_vnode};
651
652    const EPOCH: u64 = test_epoch(1);
653    const TABLE_ID: TableId = TableId::new(233);
654
655    async fn assert_iter_eq(
656        mut first: Option<impl HummockIterator>,
657        mut second: impl HummockIterator,
658        seek_key: Option<(usize, usize)>,
659    ) {
660        if let Some((vnode, key_index)) = seek_key {
661            let (seek_key, _) = gen_key_value(vnode, key_index);
662            let full_key = FullKey {
663                user_key: UserKey {
664                    table_id: TABLE_ID,
665                    table_key: seek_key,
666                },
667                epoch_with_gap: EpochWithGap::new_from_epoch(EPOCH),
668            };
669            if let Some(first) = &mut first {
670                first.seek(full_key.to_ref()).await.unwrap();
671            }
672            second.seek(full_key.to_ref()).await.unwrap()
673        } else {
674            if let Some(first) = &mut first {
675                first.rewind().await.unwrap();
676            }
677            second.rewind().await.unwrap();
678        }
679
680        if let Some(first) = &mut first {
681            while first.is_valid() {
682                assert!(second.is_valid());
683                let first_key = first.key();
684                let second_key = second.key();
685                assert_eq!(first_key, second_key);
686                assert_eq!(first.value(), second.value());
687                first.next().await.unwrap();
688                second.next().await.unwrap();
689            }
690        }
691        assert!(!second.is_valid());
692    }
693
694    fn build_batch(
695        pairs: impl Iterator<Item = (TableKey<Bytes>, SharedBufferValue<Bytes>)>,
696        table_id: TableId,
697    ) -> Option<SharedBufferBatch> {
698        let pairs: Vec<_> = pairs.collect();
699        if pairs.is_empty() {
700            None
701        } else {
702            Some(SharedBufferBatch::for_test(pairs, EPOCH, table_id))
703        }
704    }
705
706    fn filter_with_watermarks(
707        iter: impl Iterator<Item = (TableKey<Bytes>, SharedBufferValue<Bytes>)>,
708        table_watermarks: ReadTableWatermark,
709    ) -> impl Iterator<Item = (TableKey<Bytes>, SharedBufferValue<Bytes>)> {
710        iter.filter(move |(key, _)| {
711            if let Some(watermark) = table_watermarks.vnode_watermarks.get(&key.vnode_part()) {
712                !table_watermarks
713                    .direction
714                    .key_filter_by_watermark(key.key_part(), watermark)
715            } else {
716                true
717            }
718        })
719    }
720
721    fn gen_inner_key(index: usize) -> String {
722        format!("key{:5}", index)
723    }
724
725    async fn test_watermark(
726        watermarks: impl IntoIterator<Item = (usize, usize)>,
727        direction: WatermarkDirection,
728    ) {
729        let test_index: [(usize, usize); 7] =
730            [(0, 2), (0, 3), (0, 4), (1, 1), (1, 3), (4, 2), (8, 1)];
731        let items = test_index
732            .iter()
733            .map(|(vnode, key_index)| gen_key_value(*vnode, *key_index))
734            .collect_vec();
735
736        let read_watermark = ReadTableWatermark {
737            direction,
738            vnode_watermarks: BTreeMap::from_iter(watermarks.into_iter().map(
739                |(vnode, key_index)| {
740                    (
741                        VirtualNode::from_index(vnode),
742                        Bytes::from(gen_inner_key(key_index)),
743                    )
744                },
745            )),
746        };
747
748        let gen_iters = || {
749            let batch = build_batch(
750                filter_with_watermarks(items.clone().into_iter(), read_watermark.clone()),
751                TABLE_ID,
752            );
753
754            let iter = PkPrefixSkipWatermarkIterator::new(
755                build_batch(items.clone().into_iter(), TABLE_ID)
756                    .unwrap()
757                    .into_forward_iter(),
758                PkPrefixSkipWatermarkState::new(BTreeMap::from_iter(once((
759                    TABLE_ID,
760                    read_watermark.clone(),
761                )))),
762            );
763            (batch.map(|batch| batch.into_forward_iter()), iter)
764        };
765        let (first, second) = gen_iters();
766        assert_iter_eq(first, second, None).await;
767        for (vnode, key_index) in &test_index {
768            let (first, second) = gen_iters();
769            assert_iter_eq(first, second, Some((*vnode, *key_index))).await;
770        }
771        let (last_vnode, last_key_index) = test_index.last().unwrap();
772        let (first, second) = gen_iters();
773        assert_iter_eq(first, second, Some((*last_vnode, last_key_index + 1))).await;
774    }
775
776    fn gen_key_value(vnode: usize, index: usize) -> (TableKey<Bytes>, SharedBufferValue<Bytes>) {
777        (
778            gen_key_from_str(VirtualNode::from_index(vnode), &gen_inner_key(index)),
779            SharedBufferValue::Insert(Bytes::copy_from_slice(
780                format!("{}-value-{}", vnode, index).as_bytes(),
781            )),
782        )
783    }
784
785    #[tokio::test]
786    async fn test_no_watermark() {
787        test_watermark(empty(), WatermarkDirection::Ascending).await;
788        test_watermark(empty(), WatermarkDirection::Descending).await;
789    }
790
791    #[tokio::test]
792    async fn test_too_low_watermark() {
793        test_watermark(vec![(0, 0)], WatermarkDirection::Ascending).await;
794        test_watermark(vec![(0, 10)], WatermarkDirection::Descending).await;
795    }
796
797    #[tokio::test]
798    async fn test_single_watermark() {
799        test_watermark(vec![(0, 3)], WatermarkDirection::Ascending).await;
800        test_watermark(vec![(0, 3)], WatermarkDirection::Descending).await;
801    }
802
803    #[tokio::test]
804    async fn test_watermark_vnode_no_data() {
805        test_watermark(vec![(3, 3)], WatermarkDirection::Ascending).await;
806        test_watermark(vec![(3, 3)], WatermarkDirection::Descending).await;
807    }
808
809    #[tokio::test]
810    async fn test_filter_all() {
811        test_watermark(
812            vec![(0, 5), (1, 4), (2, 0), (4, 3), (8, 2)],
813            WatermarkDirection::Ascending,
814        )
815        .await;
816        test_watermark(
817            vec![(0, 0), (1, 0), (2, 0), (4, 0), (8, 0)],
818            WatermarkDirection::Descending,
819        )
820        .await;
821    }
822
823    #[tokio::test]
824    async fn test_advance_multi_vnode() {
825        test_watermark(vec![(1, 2), (8, 0)], WatermarkDirection::Ascending).await;
826    }
827
828    #[tokio::test]
829    async fn test_non_pk_prefix_watermark() {
830        fn gen_key_value(
831            vnode: usize,
832            col_0: i32,
833            col_1: i32,
834            col_2: i32,
835            col_3: i32,
836            pk_serde: &OrderedRowSerde,
837            pk_indices: &[usize],
838        ) -> (TableKey<Bytes>, SharedBufferValue<Bytes>) {
839            let r = OwnedRow::new(vec![
840                Some(ScalarImpl::Int32(col_0)),
841                Some(ScalarImpl::Int32(col_1)),
842                Some(ScalarImpl::Int32(col_2)), // watermark column
843                Some(ScalarImpl::Int32(col_3)),
844            ]);
845
846            let pk = r.project(pk_indices);
847
848            let k1 = serialize_pk_with_vnode(pk, pk_serde, VirtualNode::from_index(vnode));
849            let v1 = SharedBufferValue::Insert(Bytes::copy_from_slice(
850                format!("{}-value-{}", vnode, col_2).as_bytes(),
851            ));
852            (k1, v1)
853        }
854
855        let watermark_direction = WatermarkDirection::Ascending;
856
857        let watermark_col_serde =
858            OrderedRowSerde::new(vec![DataType::Int32], vec![OrderType::ascending()]);
859        let pk_serde = OrderedRowSerde::new(
860            vec![DataType::Int32, DataType::Int32, DataType::Int32],
861            vec![
862                OrderType::ascending(),
863                OrderType::ascending(),
864                OrderType::ascending(),
865            ],
866        );
867
868        let pk_indices = vec![0, 2, 3];
869        let watermark_col_idx_in_pk = 1;
870
871        {
872            // test single vnode
873            let shared_buffer_batch = {
874                let mut kv_pairs = (0..10)
875                    .map(|i| gen_key_value(0, i, 0, i, i, &pk_serde, &pk_indices))
876                    .collect_vec();
877                kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
878                build_batch(kv_pairs.into_iter(), TABLE_ID)
879            }
880            .unwrap();
881
882            {
883                // empty read watermark
884                let read_watermark = ReadTableWatermark {
885                    direction: watermark_direction,
886                    vnode_watermarks: BTreeMap::default(),
887                };
888
889                let compaction_catalog_agent_ref = CompactionCatalogAgent::for_test(vec![TABLE_ID]);
890
891                let mut iter = NonPkPrefixSkipWatermarkIterator::new(
892                    shared_buffer_batch.clone().into_forward_iter(),
893                    NonPkPrefixSkipWatermarkState::new(
894                        BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
895                        compaction_catalog_agent_ref,
896                    ),
897                );
898
899                iter.rewind().await.unwrap();
900                assert!(iter.is_valid());
901                for i in 0..10 {
902                    let (k, _v) = gen_key_value(0, i, 0, i, i, &pk_serde, &pk_indices);
903                    assert_eq!(iter.key().user_key.table_key.as_ref(), k.as_ref());
904                    iter.next().await.unwrap();
905                }
906                assert!(!iter.is_valid());
907            }
908
909            {
910                // test watermark
911                let watermark = {
912                    let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
913                    serialize_pk(r1, &watermark_col_serde)
914                };
915
916                let read_watermark = ReadTableWatermark {
917                    direction: watermark_direction,
918                    vnode_watermarks: BTreeMap::from_iter(once((
919                        VirtualNode::from_index(0),
920                        watermark.clone(),
921                    ))),
922                };
923
924                let full_key_filter_key_extractor =
925                    FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
926
927                let table_id_to_vnode =
928                    HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
929
930                let table_id_to_watermark_serde = HashMap::from_iter(once((
931                    TABLE_ID,
932                    Some((
933                        pk_serde.clone(),
934                        watermark_col_serde.clone(),
935                        watermark_col_idx_in_pk,
936                    )),
937                )));
938
939                let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
940                    full_key_filter_key_extractor,
941                    table_id_to_vnode,
942                    table_id_to_watermark_serde,
943                    HashMap::default(),
944                ));
945
946                let mut iter = NonPkPrefixSkipWatermarkIterator::new(
947                    shared_buffer_batch.clone().into_forward_iter(),
948                    NonPkPrefixSkipWatermarkState::new(
949                        BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
950                        compaction_catalog_agent_ref,
951                    ),
952                );
953
954                iter.rewind().await.unwrap();
955                assert!(iter.is_valid());
956                for i in 5..10 {
957                    let (k, _v) = gen_key_value(0, i, 0, i, i, &pk_serde, &pk_indices);
958                    assert_eq!(iter.key().user_key.table_key.as_ref(), k.as_ref());
959                    iter.next().await.unwrap();
960                }
961                assert!(!iter.is_valid());
962            }
963        }
964
965        {
966            // test multi vnode
967            let shared_buffer_batch = {
968                let mut kv_pairs = (0..10_i32)
969                    .map(|i| gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices))
970                    .collect_vec();
971
972                kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
973                build_batch(kv_pairs.into_iter(), TABLE_ID)
974            };
975
976            {
977                // test watermark
978                let watermark = {
979                    let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
980                    serialize_pk(r1, &watermark_col_serde)
981                };
982
983                let read_watermark = ReadTableWatermark {
984                    direction: watermark_direction,
985                    vnode_watermarks: BTreeMap::from_iter(
986                        (0..2).map(|i| (VirtualNode::from_index(i), watermark.clone())),
987                    ),
988                };
989
990                let full_key_filter_key_extractor =
991                    FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
992
993                let table_id_to_vnode =
994                    HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
995
996                let table_id_to_watermark_serde = HashMap::from_iter(once((
997                    TABLE_ID,
998                    Some((
999                        pk_serde.clone(),
1000                        watermark_col_serde.clone(),
1001                        watermark_col_idx_in_pk,
1002                    )),
1003                )));
1004
1005                let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1006                    full_key_filter_key_extractor,
1007                    table_id_to_vnode,
1008                    table_id_to_watermark_serde,
1009                    HashMap::default(),
1010                ));
1011
1012                let mut iter = NonPkPrefixSkipWatermarkIterator::new(
1013                    shared_buffer_batch.clone().unwrap().into_forward_iter(),
1014                    NonPkPrefixSkipWatermarkState::new(
1015                        BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
1016                        compaction_catalog_agent_ref,
1017                    ),
1018                );
1019
1020                iter.rewind().await.unwrap();
1021                assert!(iter.is_valid());
1022                let mut kv_pairs = (5..10_i32)
1023                    .map(|i| {
1024                        let (k, v) =
1025                            gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices);
1026                        (k, v)
1027                    })
1028                    .collect_vec();
1029                kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1030                let mut index = 0;
1031                while iter.is_valid() {
1032                    assert!(kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
1033                    iter.next().await.unwrap();
1034                    index += 1;
1035                }
1036            }
1037
1038            {
1039                // test watermark
1040                let watermark = {
1041                    let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
1042                    serialize_pk(r1, &watermark_col_serde)
1043                };
1044
1045                let read_watermark = ReadTableWatermark {
1046                    direction: watermark_direction,
1047                    vnode_watermarks: BTreeMap::from_iter(
1048                        (0..2).map(|i| (VirtualNode::from_index(i), watermark.clone())),
1049                    ),
1050                };
1051
1052                let full_key_filter_key_extractor =
1053                    FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
1054
1055                let table_id_to_vnode =
1056                    HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
1057
1058                let table_id_to_watermark_serde = HashMap::from_iter(once((
1059                    TABLE_ID,
1060                    Some((
1061                        pk_serde.clone(),
1062                        watermark_col_serde.clone(),
1063                        watermark_col_idx_in_pk,
1064                    )),
1065                )));
1066
1067                let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1068                    full_key_filter_key_extractor,
1069                    table_id_to_vnode,
1070                    table_id_to_watermark_serde,
1071                    HashMap::default(),
1072                ));
1073
1074                let mut iter = NonPkPrefixSkipWatermarkIterator::new(
1075                    shared_buffer_batch.clone().unwrap().into_forward_iter(),
1076                    NonPkPrefixSkipWatermarkState::new(
1077                        BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
1078                        compaction_catalog_agent_ref,
1079                    ),
1080                );
1081
1082                iter.rewind().await.unwrap();
1083                assert!(iter.is_valid());
1084                let mut kv_pairs = (5..10_i32)
1085                    .map(|i| {
1086                        let (k, v) =
1087                            gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices);
1088                        (k, v)
1089                    })
1090                    .collect_vec();
1091                kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1092                let mut index = 0;
1093                while iter.is_valid() {
1094                    assert!(kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
1095                    iter.next().await.unwrap();
1096                    index += 1;
1097                }
1098            }
1099
1100            {
1101                // test watermark Descending
1102                let watermark = {
1103                    let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
1104                    serialize_pk(r1, &watermark_col_serde)
1105                };
1106
1107                let read_watermark = ReadTableWatermark {
1108                    direction: WatermarkDirection::Descending,
1109                    vnode_watermarks: BTreeMap::from_iter(
1110                        (0..2).map(|i| (VirtualNode::from_index(i), watermark.clone())),
1111                    ),
1112                };
1113
1114                let full_key_filter_key_extractor =
1115                    FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
1116
1117                let table_id_to_vnode =
1118                    HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
1119
1120                let table_id_to_watermark_serde = HashMap::from_iter(once((
1121                    TABLE_ID,
1122                    Some((
1123                        pk_serde.clone(),
1124                        watermark_col_serde.clone(),
1125                        watermark_col_idx_in_pk,
1126                    )),
1127                )));
1128
1129                let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1130                    full_key_filter_key_extractor,
1131                    table_id_to_vnode,
1132                    table_id_to_watermark_serde,
1133                    HashMap::default(),
1134                ));
1135
1136                let mut iter = NonPkPrefixSkipWatermarkIterator::new(
1137                    shared_buffer_batch.clone().unwrap().into_forward_iter(),
1138                    NonPkPrefixSkipWatermarkState::new(
1139                        BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
1140                        compaction_catalog_agent_ref,
1141                    ),
1142                );
1143
1144                iter.rewind().await.unwrap();
1145                assert!(iter.is_valid());
1146                let mut kv_pairs = (0..=5_i32)
1147                    .map(|i| {
1148                        let (k, v) =
1149                            gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices);
1150                        (k, v)
1151                    })
1152                    .collect_vec();
1153                kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1154                let mut index = 0;
1155                while iter.is_valid() {
1156                    assert!(kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
1157                    iter.next().await.unwrap();
1158                    index += 1;
1159                }
1160            }
1161        }
1162    }
1163
1164    #[tokio::test]
1165    async fn test_mix_watermark() {
1166        fn gen_key_value(
1167            vnode: usize,
1168            col_0: i32,
1169            col_1: i32,
1170            col_2: i32,
1171            col_3: i32,
1172            pk_serde: &OrderedRowSerde,
1173            pk_indices: &[usize],
1174        ) -> (TableKey<Bytes>, SharedBufferValue<Bytes>) {
1175            let r = OwnedRow::new(vec![
1176                Some(ScalarImpl::Int32(col_0)),
1177                Some(ScalarImpl::Int32(col_1)),
1178                Some(ScalarImpl::Int32(col_2)), // watermark column
1179                Some(ScalarImpl::Int32(col_3)),
1180            ]);
1181
1182            let pk = r.project(pk_indices);
1183
1184            let k1 = serialize_pk_with_vnode(pk, pk_serde, VirtualNode::from_index(vnode));
1185            let v1 = SharedBufferValue::Insert(Bytes::copy_from_slice(
1186                format!("{}-value-{}-{}-{}-{}", vnode, col_0, col_1, col_2, col_3).as_bytes(),
1187            ));
1188
1189            (k1, v1)
1190        }
1191
1192        let watermark_col_serde =
1193            OrderedRowSerde::new(vec![DataType::Int32], vec![OrderType::ascending()]);
1194        let t1_pk_serde = OrderedRowSerde::new(
1195            vec![DataType::Int32, DataType::Int32, DataType::Int32],
1196            vec![
1197                OrderType::ascending(),
1198                OrderType::ascending(),
1199                OrderType::ascending(),
1200            ],
1201        );
1202
1203        let t1_pk_indices = vec![0, 2, 3];
1204        let t1_watermark_col_idx_in_pk = 1;
1205
1206        let t2_pk_indices = vec![0, 1, 2];
1207
1208        let t2_pk_serde = OrderedRowSerde::new(
1209            vec![DataType::Int32, DataType::Int32, DataType::Int32],
1210            vec![
1211                OrderType::ascending(),
1212                OrderType::ascending(),
1213                OrderType::ascending(),
1214            ],
1215        );
1216
1217        let t1_id = TABLE_ID;
1218        let t2_id = TableId::from(t1_id.as_raw_id() + 1);
1219
1220        let t1_shared_buffer_batch = {
1221            let mut kv_pairs = (0..10_i32)
1222                .map(|i| {
1223                    gen_key_value(
1224                        i as usize % 2,
1225                        10 - i,
1226                        0,
1227                        i,
1228                        i,
1229                        &t1_pk_serde,
1230                        &t1_pk_indices,
1231                    )
1232                })
1233                .collect_vec();
1234
1235            kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1236            build_batch(kv_pairs.into_iter(), t1_id).unwrap()
1237        };
1238
1239        let t2_shared_buffer_batch = {
1240            let mut kv_pairs = (0..10_i32)
1241                .map(|i| {
1242                    gen_key_value(
1243                        i as usize % 2,
1244                        10 - i,
1245                        0,
1246                        0,
1247                        0,
1248                        &t2_pk_serde,
1249                        &t2_pk_indices,
1250                    )
1251                })
1252                .collect_vec();
1253
1254            kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1255            build_batch(kv_pairs.into_iter(), t2_id).unwrap()
1256        };
1257
1258        let t1_watermark = {
1259            let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
1260            serialize_pk(r1, &watermark_col_serde)
1261        };
1262
1263        let t1_read_watermark = ReadTableWatermark {
1264            direction: WatermarkDirection::Ascending,
1265            vnode_watermarks: BTreeMap::from_iter(
1266                (0..2).map(|i| (VirtualNode::from_index(i), t1_watermark.clone())),
1267            ),
1268        };
1269
1270        let t2_watermark = {
1271            let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
1272            serialize_pk(r1, &watermark_col_serde)
1273        };
1274
1275        let t2_read_watermark = ReadTableWatermark {
1276            direction: WatermarkDirection::Ascending,
1277            vnode_watermarks: BTreeMap::from_iter(
1278                (0..2).map(|i| (VirtualNode::from_index(i), t2_watermark.clone())),
1279            ),
1280        };
1281
1282        {
1283            // test non pk prefix watermark
1284            let t1_iter = t1_shared_buffer_batch.clone().into_forward_iter();
1285            let t2_iter = t2_shared_buffer_batch.clone().into_forward_iter();
1286            let iter_vec = vec![t1_iter, t2_iter];
1287            let merge_iter = MergeIterator::new(iter_vec);
1288
1289            let full_key_filter_key_extractor =
1290                FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
1291
1292            let table_id_to_vnode =
1293                HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
1294
1295            let table_id_to_watermark_serde = HashMap::from_iter(once((
1296                t1_id,
1297                Some((
1298                    t1_pk_serde.clone(),
1299                    watermark_col_serde.clone(),
1300                    t1_watermark_col_idx_in_pk,
1301                )),
1302            )));
1303
1304            let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1305                full_key_filter_key_extractor,
1306                table_id_to_vnode,
1307                table_id_to_watermark_serde,
1308                HashMap::default(),
1309            ));
1310
1311            let mut iter = NonPkPrefixSkipWatermarkIterator::new(
1312                merge_iter,
1313                NonPkPrefixSkipWatermarkState::new(
1314                    BTreeMap::from_iter(once((TABLE_ID, t1_read_watermark.clone()))),
1315                    compaction_catalog_agent_ref,
1316                ),
1317            );
1318
1319            iter.rewind().await.unwrap();
1320            assert!(iter.is_valid());
1321            let mut t1_kv_pairs = (5..10_i32)
1322                .map(|i| {
1323                    let (k, v) = gen_key_value(
1324                        i as usize % 2,
1325                        10 - i,
1326                        0,
1327                        i,
1328                        i,
1329                        &t1_pk_serde,
1330                        &t1_pk_indices,
1331                    );
1332                    (k, v)
1333                })
1334                .collect_vec();
1335
1336            t1_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1337
1338            let mut t2_kv_pairs = (0..10_i32)
1339                .map(|i| {
1340                    gen_key_value(
1341                        i as usize % 2,
1342                        10 - i,
1343                        0,
1344                        0,
1345                        0,
1346                        &t2_pk_serde,
1347                        &t2_pk_indices,
1348                    )
1349                })
1350                .collect_vec();
1351
1352            t2_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1353            let mut index = 0;
1354            for _ in 0..t1_kv_pairs.len() {
1355                assert!(t1_kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
1356                iter.next().await.unwrap();
1357                index += 1;
1358            }
1359
1360            assert!(iter.is_valid());
1361            assert_eq!(t1_kv_pairs.len(), index);
1362
1363            index = 0;
1364            for _ in 0..t2_kv_pairs.len() {
1365                assert!(t2_kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
1366                iter.next().await.unwrap();
1367                index += 1;
1368            }
1369
1370            assert!(!iter.is_valid());
1371            assert_eq!(t2_kv_pairs.len(), index);
1372        }
1373
1374        {
1375            let t1_iter = t1_shared_buffer_batch.clone().into_forward_iter();
1376            let t2_iter = t2_shared_buffer_batch.clone().into_forward_iter();
1377            let iter_vec = vec![t1_iter, t2_iter];
1378            let merge_iter = MergeIterator::new(iter_vec);
1379
1380            let full_key_filter_key_extractor =
1381                FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
1382
1383            let table_id_to_vnode = HashMap::from_iter(
1384                vec![
1385                    (t1_id, VirtualNode::COUNT_FOR_TEST),
1386                    (t2_id, VirtualNode::COUNT_FOR_TEST),
1387                ]
1388                .into_iter(),
1389            );
1390
1391            let table_id_to_watermark_serde = HashMap::from_iter(
1392                vec![
1393                    (
1394                        t1_id,
1395                        Some((
1396                            t1_pk_serde.clone(),
1397                            watermark_col_serde.clone(),
1398                            t1_watermark_col_idx_in_pk,
1399                        )),
1400                    ),
1401                    (t2_id, None),
1402                ]
1403                .into_iter(),
1404            );
1405
1406            let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1407                full_key_filter_key_extractor,
1408                table_id_to_vnode,
1409                table_id_to_watermark_serde,
1410                HashMap::default(),
1411            ));
1412
1413            let non_pk_prefix_iter = NonPkPrefixSkipWatermarkIterator::new(
1414                merge_iter,
1415                NonPkPrefixSkipWatermarkState::new(
1416                    BTreeMap::from_iter(once((t1_id, t1_read_watermark.clone()))),
1417                    compaction_catalog_agent_ref.clone(),
1418                ),
1419            );
1420
1421            let mut mix_iter = PkPrefixSkipWatermarkIterator::new(
1422                non_pk_prefix_iter,
1423                PkPrefixSkipWatermarkState::new(BTreeMap::from_iter(once((
1424                    t2_id,
1425                    t2_read_watermark.clone(),
1426                )))),
1427            );
1428
1429            mix_iter.rewind().await.unwrap();
1430            assert!(mix_iter.is_valid());
1431
1432            let mut t1_kv_pairs = (5..10_i32)
1433                .map(|i| {
1434                    let (k, v) = gen_key_value(
1435                        i as usize % 2,
1436                        10 - i,
1437                        0,
1438                        i,
1439                        i,
1440                        &t1_pk_serde,
1441                        &t1_pk_indices,
1442                    );
1443                    (k, v)
1444                })
1445                .collect_vec();
1446
1447            t1_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1448
1449            let mut t2_kv_pairs = (0..=5_i32)
1450                .map(|i| {
1451                    gen_key_value(
1452                        i as usize % 2,
1453                        10 - i,
1454                        0,
1455                        0,
1456                        0,
1457                        &t2_pk_serde,
1458                        &t2_pk_indices,
1459                    )
1460                })
1461                .collect_vec();
1462
1463            t2_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1464
1465            let mut index = 0;
1466            for _ in 0..t1_kv_pairs.len() {
1467                assert!(
1468                    t1_kv_pairs[index].0.as_ref() == mix_iter.key().user_key.table_key.as_ref()
1469                );
1470                mix_iter.next().await.unwrap();
1471                index += 1;
1472            }
1473
1474            assert!(mix_iter.is_valid());
1475            assert_eq!(t1_kv_pairs.len(), index);
1476
1477            index = 0;
1478
1479            for _ in 0..t2_kv_pairs.len() {
1480                assert!(
1481                    t2_kv_pairs[index].0.as_ref() == mix_iter.key().user_key.table_key.as_ref()
1482                );
1483                mix_iter.next().await.unwrap();
1484                index += 1;
1485            }
1486
1487            assert!(!mix_iter.is_valid());
1488            assert_eq!(t2_kv_pairs.len(), index);
1489        }
1490    }
1491}