risingwave_storage/hummock/iterator/
skip_watermark.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::{BTreeMap, VecDeque};
17
18use bytes::Bytes;
19use risingwave_common::catalog::TableId;
20use risingwave_common::hash::VirtualNode;
21use risingwave_common::row::Row;
22use risingwave_common::types::Datum;
23use risingwave_common::util::row_serde::OrderedRowSerde;
24use risingwave_hummock_sdk::compaction_group::hummock_version_ext::safe_epoch_read_table_watermarks_impl;
25use risingwave_hummock_sdk::key::FullKey;
26use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap, add_table_stats_map};
27use risingwave_hummock_sdk::table_watermark::{
28    ReadTableWatermark, TableWatermarks, WatermarkDirection,
29};
30
31use super::SkipWatermarkState;
32use crate::compaction_catalog_manager::{CompactionCatalogAgentRef, ValueWatermarkColumnSerdeRef};
33use crate::hummock::HummockResult;
34use crate::hummock::iterator::{Forward, HummockIterator, ValueMeta};
35use crate::hummock::value::HummockValue;
36use crate::monitor::StoreLocalStatistic;
37
38pub struct SkipWatermarkIterator<I, S> {
39    inner: I,
40    state: S,
41    /// The stats of skipped key-value pairs for each table.
42    skipped_entry_table_stats: TableStatsMap,
43    /// The id of table currently undergoing processing.
44    last_table_id: Option<TableId>,
45    /// The stats of table currently undergoing processing.
46    last_table_stats: TableStats,
47}
48
49impl<I: HummockIterator<Direction = Forward>, S: SkipWatermarkState> SkipWatermarkIterator<I, S> {
50    pub fn new(inner: I, state: S) -> Self {
51        Self {
52            inner,
53            state,
54            skipped_entry_table_stats: TableStatsMap::default(),
55            last_table_id: None,
56            last_table_stats: TableStats::default(),
57        }
58    }
59
60    fn reset_watermark(&mut self) {
61        self.state.reset_watermark();
62    }
63
64    fn reset_skipped_entry_table_stats(&mut self) {
65        self.skipped_entry_table_stats = TableStatsMap::default();
66        self.last_table_id = None;
67        self.last_table_stats = TableStats::default();
68    }
69
70    /// Advance the key until iterator invalid or the current key will not be filtered by the latest watermark.
71    /// Calling this method should ensure that the first remaining watermark has been advanced to the current key.
72    ///
73    /// Return a flag indicating whether should later advance the watermark.
74    async fn advance_key_and_watermark(&mut self) -> HummockResult<()> {
75        // advance key and watermark in an interleave manner until nothing
76        // changed after the method is called.
77        while self.inner.is_valid() {
78            if !self
79                .state
80                .should_delete(&self.inner.key(), self.inner.value())
81            {
82                break;
83            }
84
85            if self
86                .last_table_id
87                .is_none_or(|last_table_id| last_table_id != self.inner.key().user_key.table_id)
88            {
89                self.add_last_table_stats();
90                self.last_table_id = Some(self.inner.key().user_key.table_id);
91            }
92            self.last_table_stats.total_key_count -= 1;
93            self.last_table_stats.total_key_size -= self.inner.key().encoded_len() as i64;
94            self.last_table_stats.total_value_size -= self.inner.value().encoded_len() as i64;
95
96            self.inner.next().await?;
97        }
98        self.add_last_table_stats();
99        Ok(())
100    }
101
102    fn add_last_table_stats(&mut self) {
103        let Some(last_table_id) = self.last_table_id.take() else {
104            return;
105        };
106        let delta = std::mem::take(&mut self.last_table_stats);
107        let e = self
108            .skipped_entry_table_stats
109            .entry(last_table_id)
110            .or_default();
111        e.total_key_count += delta.total_key_count;
112        e.total_key_size += delta.total_key_size;
113        e.total_value_size += delta.total_value_size;
114    }
115}
116
117impl<I: HummockIterator<Direction = Forward>, S: SkipWatermarkState> HummockIterator
118    for SkipWatermarkIterator<I, S>
119{
120    type Direction = Forward;
121
122    async fn next(&mut self) -> HummockResult<()> {
123        self.inner.next().await?;
124        // Check whether there is any remaining watermark and return early to
125        // avoid calling the async `advance_key_and_watermark`, since in benchmark
126        // performance downgrade is observed without this early return.
127        if self.state.has_watermark() {
128            self.advance_key_and_watermark().await?;
129        }
130        Ok(())
131    }
132
133    fn key(&self) -> FullKey<&[u8]> {
134        self.inner.key()
135    }
136
137    fn value(&self) -> HummockValue<&[u8]> {
138        self.inner.value()
139    }
140
141    fn is_valid(&self) -> bool {
142        self.inner.is_valid()
143    }
144
145    async fn rewind(&mut self) -> HummockResult<()> {
146        self.reset_watermark();
147        self.reset_skipped_entry_table_stats();
148        self.inner.rewind().await?;
149        self.advance_key_and_watermark().await?;
150        Ok(())
151    }
152
153    async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
154        self.reset_watermark();
155        self.reset_skipped_entry_table_stats();
156        self.inner.seek(key).await?;
157        self.advance_key_and_watermark().await?;
158        Ok(())
159    }
160
161    fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
162        add_table_stats_map(
163            &mut stats.skipped_by_watermark_table_stats,
164            &self.skipped_entry_table_stats,
165        );
166        self.inner.collect_local_statistic(stats)
167    }
168
169    fn value_meta(&self) -> ValueMeta {
170        self.inner.value_meta()
171    }
172}
173pub struct PkPrefixSkipWatermarkState {
174    watermarks: BTreeMap<TableId, ReadTableWatermark>,
175    remain_watermarks: VecDeque<(TableId, VirtualNode, WatermarkDirection, Bytes)>,
176}
177
178impl SkipWatermarkState for PkPrefixSkipWatermarkState {
179    #[inline(always)]
180    fn has_watermark(&self) -> bool {
181        !self.remain_watermarks.is_empty()
182    }
183
184    fn should_delete(&mut self, key: &FullKey<&[u8]>, value: HummockValue<&[u8]>) -> bool {
185        if let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
186            let key_table_id = key.user_key.table_id;
187            let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
188            match (&key_table_id, &key_vnode).cmp(&(table_id, vnode)) {
189                Ordering::Less => {
190                    return false;
191                }
192                Ordering::Equal => {
193                    return direction.key_filter_by_watermark(inner_key, watermark);
194                }
195                Ordering::Greater => {
196                    // The current key has advanced over the watermark.
197                    // We may advance the watermark before advancing the key.
198                    return self.advance_watermark(key, value);
199                }
200            }
201        }
202        false
203    }
204
205    fn reset_watermark(&mut self) {
206        self.remain_watermarks = self
207            .watermarks
208            .iter()
209            .flat_map(|(table_id, read_watermarks)| {
210                read_watermarks
211                    .vnode_watermarks
212                    .iter()
213                    .map(|(vnode, watermarks)| {
214                        (
215                            *table_id,
216                            *vnode,
217                            read_watermarks.direction,
218                            watermarks.clone(),
219                        )
220                    })
221            })
222            .collect();
223    }
224
225    /// Advance watermark until no watermark remains or the first watermark can possibly
226    /// filter out the current or future key.
227    ///
228    /// Return a flag indicating whether the current key will be filtered by the current watermark.
229    fn advance_watermark(&mut self, key: &FullKey<&[u8]>, _value: HummockValue<&[u8]>) -> bool {
230        let key_table_id = key.user_key.table_id;
231        let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
232        while let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
233            match (table_id, vnode).cmp(&(&key_table_id, &key_vnode)) {
234                Ordering::Less => {
235                    self.remain_watermarks.pop_front();
236                    continue;
237                }
238                Ordering::Equal => {
239                    match direction {
240                        WatermarkDirection::Ascending => {
241                            match inner_key.cmp(watermark.as_ref()) {
242                                Ordering::Less => {
243                                    // The current key will be filtered by the watermark.
244                                    // Return true to further advance the key.
245                                    return true;
246                                }
247                                Ordering::Equal | Ordering::Greater => {
248                                    // The current key has passed the watermark.
249                                    // Advance the next watermark.
250                                    self.remain_watermarks.pop_front();
251                                    // Since it is impossible for a (table_id, vnode) tuple to have multiple
252                                    // watermark, after the pop_front, the next (table_id, vnode) must have
253                                    // exceeded the current key, and we can directly return and mark that the
254                                    // current key is not filtered by the watermark at the front.
255                                    #[cfg(debug_assertions)]
256                                    {
257                                        if let Some((next_table_id, next_vnode, _, _)) =
258                                            self.remain_watermarks.front()
259                                        {
260                                            assert!(
261                                                (next_table_id, next_vnode)
262                                                    > (&key_table_id, &key_vnode)
263                                            );
264                                        }
265                                    }
266                                    return false;
267                                }
268                            }
269                        }
270                        WatermarkDirection::Descending => {
271                            return match inner_key.cmp(watermark.as_ref()) {
272                                // Current key as not reached the watermark. Just return.
273                                Ordering::Less | Ordering::Equal => false,
274                                // Current key will be filtered by the watermark.
275                                // Return true to further advance the key.
276                                Ordering::Greater => true,
277                            };
278                        }
279                    }
280                }
281                Ordering::Greater => {
282                    return false;
283                }
284            }
285        }
286        false
287    }
288}
289
290impl PkPrefixSkipWatermarkState {
291    pub fn new(watermarks: BTreeMap<TableId, ReadTableWatermark>) -> Self {
292        Self {
293            remain_watermarks: VecDeque::new(),
294            watermarks,
295        }
296    }
297
298    pub fn from_safe_epoch_watermarks(
299        safe_epoch_watermarks: BTreeMap<TableId, TableWatermarks>,
300    ) -> Self {
301        let watermarks = safe_epoch_read_table_watermarks_impl(safe_epoch_watermarks);
302        Self::new(watermarks)
303    }
304}
305
306pub struct NonPkPrefixSkipWatermarkState {
307    watermarks: BTreeMap<TableId, ReadTableWatermark>,
308    remain_watermarks: VecDeque<(TableId, VirtualNode, WatermarkDirection, Datum)>,
309    compaction_catalog_agent_ref: CompactionCatalogAgentRef,
310
311    last_serde: Option<(OrderedRowSerde, OrderedRowSerde, usize)>,
312    last_table_id: Option<TableId>,
313}
314
315impl NonPkPrefixSkipWatermarkState {
316    pub fn new(
317        watermarks: BTreeMap<TableId, ReadTableWatermark>,
318        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
319    ) -> Self {
320        Self {
321            remain_watermarks: VecDeque::new(),
322            watermarks,
323            compaction_catalog_agent_ref,
324            last_serde: None,
325            last_table_id: None,
326        }
327    }
328
329    pub fn from_safe_epoch_watermarks(
330        safe_epoch_watermarks: BTreeMap<TableId, TableWatermarks>,
331        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
332    ) -> Self {
333        let watermarks = safe_epoch_read_table_watermarks_impl(safe_epoch_watermarks);
334        Self::new(watermarks, compaction_catalog_agent_ref)
335    }
336}
337
338impl SkipWatermarkState for NonPkPrefixSkipWatermarkState {
339    #[inline(always)]
340    fn has_watermark(&self) -> bool {
341        !self.remain_watermarks.is_empty()
342    }
343
344    fn should_delete(&mut self, key: &FullKey<&[u8]>, value: HummockValue<&[u8]>) -> bool {
345        if let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
346            let key_table_id = key.user_key.table_id;
347            {
348                if self
349                    .last_table_id
350                    .is_none_or(|last_table_id| last_table_id != key_table_id)
351                {
352                    self.last_table_id = Some(key_table_id);
353                }
354            }
355
356            let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
357            match (&key_table_id, &key_vnode).cmp(&(table_id, vnode)) {
358                Ordering::Less => {
359                    return false;
360                }
361                Ordering::Equal => {
362                    if self.last_serde.is_none() {
363                        self.last_serde =
364                            self.compaction_catalog_agent_ref.watermark_serde(*table_id);
365                    }
366                    let (pk_prefix_serde, watermark_col_serde, watermark_col_idx_in_pk) =
367                        self.last_serde.as_ref().unwrap();
368                    let row = pk_prefix_serde
369                        .deserialize(inner_key)
370                        .unwrap_or_else(|_| {
371                            panic!("Failed to deserialize pk_prefix inner_key {:?} serde data_types {:?} order_types {:?}", inner_key, pk_prefix_serde.get_data_types(), pk_prefix_serde.get_order_types());
372                        });
373                    let watermark_col_in_pk = row.datum_at(*watermark_col_idx_in_pk);
374                    return direction.datum_filter_by_watermark(
375                        watermark_col_in_pk,
376                        watermark,
377                        watermark_col_serde.get_order_types()[0],
378                    );
379                }
380                Ordering::Greater => {
381                    // The current key has advanced over the watermark.
382                    // We may advance the watermark before advancing the key.
383                    return self.advance_watermark(key, value);
384                }
385            }
386        }
387        false
388    }
389
390    fn reset_watermark(&mut self) {
391        self.last_serde = None;
392        self.remain_watermarks = self
393            .watermarks
394            .iter()
395            .flat_map(|(table_id, read_watermarks)| {
396                let watermark_serde = self.compaction_catalog_agent_ref.watermark_serde(*table_id).map(|(_pk_serde, watermark_serde, _watermark_col_idx_in_pk)| watermark_serde);
397
398                read_watermarks
399                    .vnode_watermarks
400                    .iter()
401                    .flat_map(move |(vnode, watermarks)| {
402                        // TODO(ttl): if the watermark column is in the value, we may get a `None` here, support it.
403                        let watermark_serde = watermark_serde.as_ref()?;
404                        Some((
405                            *table_id,
406                            *vnode,
407                            read_watermarks.direction,
408                            {
409                                let row = watermark_serde
410                                .deserialize(watermarks).unwrap_or_else(|_| {
411                                    panic!("Failed to deserialize watermark {:?} serde data_types {:?} order_types {:?}", watermarks, watermark_serde.get_data_types(), watermark_serde.get_order_types());
412                                });
413                                row[0].clone()
414                            },
415                        ))
416                    })
417            })
418            .collect();
419    }
420
421    fn advance_watermark(&mut self, key: &FullKey<&[u8]>, _value: HummockValue<&[u8]>) -> bool {
422        let key_table_id = key.user_key.table_id;
423        let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
424        while let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
425            match (table_id, vnode).cmp(&(&key_table_id, &key_vnode)) {
426                Ordering::Less => {
427                    self.remain_watermarks.pop_front();
428                    self.last_serde = None;
429                    continue;
430                }
431                Ordering::Equal => {
432                    if self.last_serde.is_none() {
433                        self.last_serde =
434                            self.compaction_catalog_agent_ref.watermark_serde(*table_id);
435                    }
436                    let (pk_prefix_serde, watermark_col_serde, watermark_col_idx_in_pk) =
437                        self.last_serde.as_ref().unwrap();
438
439                    let row = pk_prefix_serde
440                        .deserialize(inner_key)
441                        .unwrap_or_else(|_| {
442                            panic!("Failed to deserialize pk_prefix inner_key {:?} serde data_types {:?} order_types {:?}", inner_key, pk_prefix_serde.get_data_types(), pk_prefix_serde.get_order_types());
443                        });
444                    let watermark_col_in_pk = row.datum_at(*watermark_col_idx_in_pk);
445
446                    return direction.datum_filter_by_watermark(
447                        watermark_col_in_pk,
448                        watermark,
449                        watermark_col_serde.get_order_types()[0],
450                    );
451                }
452                Ordering::Greater => {
453                    return false;
454                }
455            }
456        }
457        false
458    }
459}
460
461pub type PkPrefixSkipWatermarkIterator<I> = SkipWatermarkIterator<I, PkPrefixSkipWatermarkState>;
462
463pub type NonPkPrefixSkipWatermarkIterator<I> =
464    SkipWatermarkIterator<I, NonPkPrefixSkipWatermarkState>;
465
466pub type ValueSkipWatermarkIterator<I> = SkipWatermarkIterator<I, ValueSkipWatermarkState>;
467
468pub struct ValueSkipWatermarkState {
469    pub watermarks: BTreeMap<TableId, ReadTableWatermark>,
470    remain_watermarks: VecDeque<(TableId, VirtualNode, WatermarkDirection, Bytes)>,
471    compaction_catalog_agent_ref: CompactionCatalogAgentRef,
472    last_serde: Option<ValueWatermarkColumnSerdeRef>,
473    last_table_id: Option<TableId>,
474}
475
476impl ValueSkipWatermarkState {
477    pub fn new(
478        watermarks: BTreeMap<TableId, ReadTableWatermark>,
479        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
480    ) -> Self {
481        Self {
482            remain_watermarks: VecDeque::new(),
483            watermarks,
484            compaction_catalog_agent_ref,
485            last_serde: None,
486            last_table_id: None,
487        }
488    }
489
490    pub fn from_safe_epoch_watermarks(
491        safe_epoch_watermarks: BTreeMap<TableId, TableWatermarks>,
492        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
493    ) -> Self {
494        let watermarks = safe_epoch_read_table_watermarks_impl(safe_epoch_watermarks);
495        Self::new(watermarks, compaction_catalog_agent_ref)
496    }
497
498    pub fn may_delete(&self, key: &FullKey<&[u8]>) -> bool {
499        let table_id = key.user_key.table_id;
500        self.watermarks.contains_key(&table_id)
501    }
502}
503
504impl SkipWatermarkState for ValueSkipWatermarkState {
505    #[inline(always)]
506    fn has_watermark(&self) -> bool {
507        !self.remain_watermarks.is_empty()
508    }
509
510    fn should_delete(&mut self, key: &FullKey<&[u8]>, value: HummockValue<&[u8]>) -> bool {
511        if let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
512            let key_table_id = key.user_key.table_id;
513            let key_vnode = key.user_key.table_key.vnode_part();
514            if self
515                .last_table_id
516                .is_none_or(|last_table_id| last_table_id != key_table_id)
517            {
518                self.last_table_id = Some(key_table_id);
519            }
520            match (&key_table_id, &key_vnode).cmp(&(table_id, vnode)) {
521                Ordering::Less => {
522                    return false;
523                }
524                Ordering::Equal => {
525                    let HummockValue::Put(value) = value else {
526                        return false;
527                    };
528                    if self.last_serde.is_none() {
529                        self.last_serde = self
530                            .compaction_catalog_agent_ref
531                            .value_watermark_serde(*table_id);
532                    }
533                    let last_serde = self.last_serde.as_ref().unwrap();
534                    let Ok(watermark_column_value) = last_serde.deserialize(value) else {
535                        tracing::error!(
536                            ?table_id,
537                            ?vnode,
538                            ?value,
539                            "Failed to deserialize watermark column"
540                        );
541                        return false;
542                    };
543                    let Some(watermark_column_value) = watermark_column_value else {
544                        tracing::debug!(
545                            ?table_id,
546                            ?vnode,
547                            ?value,
548                            "The specified watermark column was not found in the value, likely due to the use of a non-current catalog. Restarting the compactor should resolve this issue."
549                        );
550                        return false;
551                    };
552                    return direction.key_filter_by_watermark(&watermark_column_value, watermark);
553                }
554                Ordering::Greater => {
555                    // The current key has advanced over the watermark.
556                    // We may advance the watermark before advancing the key.
557                    return self.advance_watermark(key, value);
558                }
559            }
560        }
561        false
562    }
563
564    fn reset_watermark(&mut self) {
565        self.last_serde = None;
566        self.remain_watermarks = self
567            .watermarks
568            .iter()
569            .flat_map(|(table_id, read_watermarks)| {
570                read_watermarks
571                    .vnode_watermarks
572                    .iter()
573                    .map(|(vnode, watermarks)| {
574                        (
575                            *table_id,
576                            *vnode,
577                            read_watermarks.direction,
578                            watermarks.clone(),
579                        )
580                    })
581            })
582            .collect();
583    }
584
585    fn advance_watermark(&mut self, key: &FullKey<&[u8]>, value: HummockValue<&[u8]>) -> bool {
586        let key_table_id = key.user_key.table_id;
587        let key_vnode = key.user_key.table_key.vnode_part();
588        while let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
589            match (table_id, vnode).cmp(&(&key_table_id, &key_vnode)) {
590                Ordering::Less => {
591                    self.remain_watermarks.pop_front();
592                    self.last_serde = None;
593                    continue;
594                }
595                Ordering::Equal => {
596                    let HummockValue::Put(value) = value else {
597                        return false;
598                    };
599                    if self.last_serde.is_none() {
600                        self.last_serde = self
601                            .compaction_catalog_agent_ref
602                            .value_watermark_serde(*table_id);
603                    }
604                    let last_serde = self.last_serde.as_ref().unwrap();
605                    let Ok(watermark_column_value) = last_serde.deserialize(value) else {
606                        tracing::error!(
607                            ?table_id,
608                            ?vnode,
609                            ?value,
610                            "Failed to deserialize watermark column."
611                        );
612                        return false;
613                    };
614                    let Some(watermark_column_value) = watermark_column_value else {
615                        tracing::warn!(
616                            ?table_id,
617                            ?vnode,
618                            ?value,
619                            "The specified watermark column was not found in the value, likely due to the use of a non-current catalog. Restarting the compactor should resolve this issue."
620                        );
621                        return false;
622                    };
623                    return direction.key_filter_by_watermark(&watermark_column_value, watermark);
624                }
625                Ordering::Greater => {
626                    return false;
627                }
628            }
629        }
630        false
631    }
632}
633
634#[cfg(test)]
635mod tests {
636    use std::collections::{BTreeMap, HashMap};
637    use std::iter::{empty, once};
638    use std::sync::Arc;
639
640    use bytes::Bytes;
641    use itertools::Itertools;
642    use risingwave_common::catalog::TableId;
643    use risingwave_common::hash::VirtualNode;
644    use risingwave_common::row::{OwnedRow, RowExt};
645    use risingwave_common::types::{DataType, ScalarImpl};
646    use risingwave_common::util::epoch::test_epoch;
647    use risingwave_common::util::row_serde::OrderedRowSerde;
648    use risingwave_common::util::sort_util::OrderType;
649    use risingwave_hummock_sdk::EpochWithGap;
650    use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey, gen_key_from_str};
651    use risingwave_hummock_sdk::table_watermark::{ReadTableWatermark, WatermarkDirection};
652
653    use super::PkPrefixSkipWatermarkState;
654    use crate::compaction_catalog_manager::{
655        CompactionCatalogAgent, FilterKeyExtractorImpl, FullKeyFilterKeyExtractor,
656    };
657    use crate::hummock::iterator::{
658        HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
659        NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator,
660    };
661    use crate::hummock::shared_buffer::shared_buffer_batch::{
662        SharedBufferBatch, SharedBufferValue,
663    };
664    use crate::row_serde::row_serde_util::{serialize_pk, serialize_pk_with_vnode};
665
666    const EPOCH: u64 = test_epoch(1);
667    const TABLE_ID: TableId = TableId::new(233);
668
669    async fn assert_iter_eq(
670        mut first: Option<impl HummockIterator>,
671        mut second: impl HummockIterator,
672        seek_key: Option<(usize, usize)>,
673    ) {
674        if let Some((vnode, key_index)) = seek_key {
675            let (seek_key, _) = gen_key_value(vnode, key_index);
676            let full_key = FullKey {
677                user_key: UserKey {
678                    table_id: TABLE_ID,
679                    table_key: seek_key,
680                },
681                epoch_with_gap: EpochWithGap::new_from_epoch(EPOCH),
682            };
683            if let Some(first) = &mut first {
684                first.seek(full_key.to_ref()).await.unwrap();
685            }
686            second.seek(full_key.to_ref()).await.unwrap()
687        } else {
688            if let Some(first) = &mut first {
689                first.rewind().await.unwrap();
690            }
691            second.rewind().await.unwrap();
692        }
693
694        if let Some(first) = &mut first {
695            while first.is_valid() {
696                assert!(second.is_valid());
697                let first_key = first.key();
698                let second_key = second.key();
699                assert_eq!(first_key, second_key);
700                assert_eq!(first.value(), second.value());
701                first.next().await.unwrap();
702                second.next().await.unwrap();
703            }
704        }
705        assert!(!second.is_valid());
706    }
707
708    fn build_batch(
709        pairs: impl Iterator<Item = (TableKey<Bytes>, SharedBufferValue<Bytes>)>,
710        table_id: TableId,
711    ) -> Option<SharedBufferBatch> {
712        let pairs: Vec<_> = pairs.collect();
713        if pairs.is_empty() {
714            None
715        } else {
716            Some(SharedBufferBatch::for_test(pairs, EPOCH, table_id))
717        }
718    }
719
720    fn filter_with_watermarks(
721        iter: impl Iterator<Item = (TableKey<Bytes>, SharedBufferValue<Bytes>)>,
722        table_watermarks: ReadTableWatermark,
723    ) -> impl Iterator<Item = (TableKey<Bytes>, SharedBufferValue<Bytes>)> {
724        iter.filter(move |(key, _)| {
725            if let Some(watermark) = table_watermarks.vnode_watermarks.get(&key.vnode_part()) {
726                !table_watermarks
727                    .direction
728                    .key_filter_by_watermark(key.key_part(), watermark)
729            } else {
730                true
731            }
732        })
733    }
734
735    fn gen_inner_key(index: usize) -> String {
736        format!("key{:5}", index)
737    }
738
739    async fn test_watermark(
740        watermarks: impl IntoIterator<Item = (usize, usize)>,
741        direction: WatermarkDirection,
742    ) {
743        let test_index: [(usize, usize); 7] =
744            [(0, 2), (0, 3), (0, 4), (1, 1), (1, 3), (4, 2), (8, 1)];
745        let items = test_index
746            .iter()
747            .map(|(vnode, key_index)| gen_key_value(*vnode, *key_index))
748            .collect_vec();
749
750        let read_watermark = ReadTableWatermark {
751            direction,
752            vnode_watermarks: BTreeMap::from_iter(watermarks.into_iter().map(
753                |(vnode, key_index)| {
754                    (
755                        VirtualNode::from_index(vnode),
756                        Bytes::from(gen_inner_key(key_index)),
757                    )
758                },
759            )),
760        };
761
762        let gen_iters = || {
763            let batch = build_batch(
764                filter_with_watermarks(items.clone().into_iter(), read_watermark.clone()),
765                TABLE_ID,
766            );
767
768            let iter = PkPrefixSkipWatermarkIterator::new(
769                build_batch(items.clone().into_iter(), TABLE_ID)
770                    .unwrap()
771                    .into_forward_iter(),
772                PkPrefixSkipWatermarkState::new(BTreeMap::from_iter(once((
773                    TABLE_ID,
774                    read_watermark.clone(),
775                )))),
776            );
777            (batch.map(|batch| batch.into_forward_iter()), iter)
778        };
779        let (first, second) = gen_iters();
780        assert_iter_eq(first, second, None).await;
781        for (vnode, key_index) in &test_index {
782            let (first, second) = gen_iters();
783            assert_iter_eq(first, second, Some((*vnode, *key_index))).await;
784        }
785        let (last_vnode, last_key_index) = test_index.last().unwrap();
786        let (first, second) = gen_iters();
787        assert_iter_eq(first, second, Some((*last_vnode, last_key_index + 1))).await;
788    }
789
790    fn gen_key_value(vnode: usize, index: usize) -> (TableKey<Bytes>, SharedBufferValue<Bytes>) {
791        (
792            gen_key_from_str(VirtualNode::from_index(vnode), &gen_inner_key(index)),
793            SharedBufferValue::Insert(Bytes::copy_from_slice(
794                format!("{}-value-{}", vnode, index).as_bytes(),
795            )),
796        )
797    }
798
799    #[tokio::test]
800    async fn test_no_watermark() {
801        test_watermark(empty(), WatermarkDirection::Ascending).await;
802        test_watermark(empty(), WatermarkDirection::Descending).await;
803    }
804
805    #[tokio::test]
806    async fn test_too_low_watermark() {
807        test_watermark(vec![(0, 0)], WatermarkDirection::Ascending).await;
808        test_watermark(vec![(0, 10)], WatermarkDirection::Descending).await;
809    }
810
811    #[tokio::test]
812    async fn test_single_watermark() {
813        test_watermark(vec![(0, 3)], WatermarkDirection::Ascending).await;
814        test_watermark(vec![(0, 3)], WatermarkDirection::Descending).await;
815    }
816
817    #[tokio::test]
818    async fn test_watermark_vnode_no_data() {
819        test_watermark(vec![(3, 3)], WatermarkDirection::Ascending).await;
820        test_watermark(vec![(3, 3)], WatermarkDirection::Descending).await;
821    }
822
823    #[tokio::test]
824    async fn test_filter_all() {
825        test_watermark(
826            vec![(0, 5), (1, 4), (2, 0), (4, 3), (8, 2)],
827            WatermarkDirection::Ascending,
828        )
829        .await;
830        test_watermark(
831            vec![(0, 0), (1, 0), (2, 0), (4, 0), (8, 0)],
832            WatermarkDirection::Descending,
833        )
834        .await;
835    }
836
837    #[tokio::test]
838    async fn test_advance_multi_vnode() {
839        test_watermark(vec![(1, 2), (8, 0)], WatermarkDirection::Ascending).await;
840    }
841
842    #[tokio::test]
843    async fn test_non_pk_prefix_watermark() {
844        fn gen_key_value(
845            vnode: usize,
846            col_0: i32,
847            col_1: i32,
848            col_2: i32,
849            col_3: i32,
850            pk_serde: &OrderedRowSerde,
851            pk_indices: &[usize],
852        ) -> (TableKey<Bytes>, SharedBufferValue<Bytes>) {
853            let r = OwnedRow::new(vec![
854                Some(ScalarImpl::Int32(col_0)),
855                Some(ScalarImpl::Int32(col_1)),
856                Some(ScalarImpl::Int32(col_2)), // watermark column
857                Some(ScalarImpl::Int32(col_3)),
858            ]);
859
860            let pk = r.project(pk_indices);
861
862            let k1 = serialize_pk_with_vnode(pk, pk_serde, VirtualNode::from_index(vnode));
863            let v1 = SharedBufferValue::Insert(Bytes::copy_from_slice(
864                format!("{}-value-{}", vnode, col_2).as_bytes(),
865            ));
866            (k1, v1)
867        }
868
869        let watermark_direction = WatermarkDirection::Ascending;
870
871        let watermark_col_serde =
872            OrderedRowSerde::new(vec![DataType::Int32], vec![OrderType::ascending()]);
873        let pk_serde = OrderedRowSerde::new(
874            vec![DataType::Int32, DataType::Int32, DataType::Int32],
875            vec![
876                OrderType::ascending(),
877                OrderType::ascending(),
878                OrderType::ascending(),
879            ],
880        );
881
882        let pk_indices = vec![0, 2, 3];
883        let watermark_col_idx_in_pk = 1;
884
885        {
886            // test single vnode
887            let shared_buffer_batch = {
888                let mut kv_pairs = (0..10)
889                    .map(|i| gen_key_value(0, i, 0, i, i, &pk_serde, &pk_indices))
890                    .collect_vec();
891                kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
892                build_batch(kv_pairs.into_iter(), TABLE_ID)
893            }
894            .unwrap();
895
896            {
897                // empty read watermark
898                let read_watermark = ReadTableWatermark {
899                    direction: watermark_direction,
900                    vnode_watermarks: BTreeMap::default(),
901                };
902
903                let compaction_catalog_agent_ref = CompactionCatalogAgent::for_test(vec![TABLE_ID]);
904
905                let mut iter = NonPkPrefixSkipWatermarkIterator::new(
906                    shared_buffer_batch.clone().into_forward_iter(),
907                    NonPkPrefixSkipWatermarkState::new(
908                        BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
909                        compaction_catalog_agent_ref,
910                    ),
911                );
912
913                iter.rewind().await.unwrap();
914                assert!(iter.is_valid());
915                for i in 0..10 {
916                    let (k, _v) = gen_key_value(0, i, 0, i, i, &pk_serde, &pk_indices);
917                    assert_eq!(iter.key().user_key.table_key.as_ref(), k.as_ref());
918                    iter.next().await.unwrap();
919                }
920                assert!(!iter.is_valid());
921            }
922
923            {
924                // test watermark
925                let watermark = {
926                    let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
927                    serialize_pk(r1, &watermark_col_serde)
928                };
929
930                let read_watermark = ReadTableWatermark {
931                    direction: watermark_direction,
932                    vnode_watermarks: BTreeMap::from_iter(once((
933                        VirtualNode::from_index(0),
934                        watermark.clone(),
935                    ))),
936                };
937
938                let full_key_filter_key_extractor =
939                    FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
940
941                let table_id_to_vnode =
942                    HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
943
944                let table_id_to_watermark_serde = HashMap::from_iter(once((
945                    TABLE_ID,
946                    Some((
947                        pk_serde.clone(),
948                        watermark_col_serde.clone(),
949                        watermark_col_idx_in_pk,
950                    )),
951                )));
952
953                let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
954                    full_key_filter_key_extractor,
955                    table_id_to_vnode,
956                    table_id_to_watermark_serde,
957                    HashMap::default(),
958                ));
959
960                let mut iter = NonPkPrefixSkipWatermarkIterator::new(
961                    shared_buffer_batch.clone().into_forward_iter(),
962                    NonPkPrefixSkipWatermarkState::new(
963                        BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
964                        compaction_catalog_agent_ref,
965                    ),
966                );
967
968                iter.rewind().await.unwrap();
969                assert!(iter.is_valid());
970                for i in 5..10 {
971                    let (k, _v) = gen_key_value(0, i, 0, i, i, &pk_serde, &pk_indices);
972                    assert_eq!(iter.key().user_key.table_key.as_ref(), k.as_ref());
973                    iter.next().await.unwrap();
974                }
975                assert!(!iter.is_valid());
976            }
977        }
978
979        {
980            // test multi vnode
981            let shared_buffer_batch = {
982                let mut kv_pairs = (0..10_i32)
983                    .map(|i| gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices))
984                    .collect_vec();
985
986                kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
987                build_batch(kv_pairs.into_iter(), TABLE_ID)
988            };
989
990            {
991                // test watermark
992                let watermark = {
993                    let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
994                    serialize_pk(r1, &watermark_col_serde)
995                };
996
997                let read_watermark = ReadTableWatermark {
998                    direction: watermark_direction,
999                    vnode_watermarks: BTreeMap::from_iter(
1000                        (0..2).map(|i| (VirtualNode::from_index(i), watermark.clone())),
1001                    ),
1002                };
1003
1004                let full_key_filter_key_extractor =
1005                    FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
1006
1007                let table_id_to_vnode =
1008                    HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
1009
1010                let table_id_to_watermark_serde = HashMap::from_iter(once((
1011                    TABLE_ID,
1012                    Some((
1013                        pk_serde.clone(),
1014                        watermark_col_serde.clone(),
1015                        watermark_col_idx_in_pk,
1016                    )),
1017                )));
1018
1019                let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1020                    full_key_filter_key_extractor,
1021                    table_id_to_vnode,
1022                    table_id_to_watermark_serde,
1023                    HashMap::default(),
1024                ));
1025
1026                let mut iter = NonPkPrefixSkipWatermarkIterator::new(
1027                    shared_buffer_batch.clone().unwrap().into_forward_iter(),
1028                    NonPkPrefixSkipWatermarkState::new(
1029                        BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
1030                        compaction_catalog_agent_ref,
1031                    ),
1032                );
1033
1034                iter.rewind().await.unwrap();
1035                assert!(iter.is_valid());
1036                let mut kv_pairs = (5..10_i32)
1037                    .map(|i| {
1038                        let (k, v) =
1039                            gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices);
1040                        (k, v)
1041                    })
1042                    .collect_vec();
1043                kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1044                let mut index = 0;
1045                while iter.is_valid() {
1046                    assert!(kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
1047                    iter.next().await.unwrap();
1048                    index += 1;
1049                }
1050            }
1051
1052            {
1053                // test watermark
1054                let watermark = {
1055                    let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
1056                    serialize_pk(r1, &watermark_col_serde)
1057                };
1058
1059                let read_watermark = ReadTableWatermark {
1060                    direction: watermark_direction,
1061                    vnode_watermarks: BTreeMap::from_iter(
1062                        (0..2).map(|i| (VirtualNode::from_index(i), watermark.clone())),
1063                    ),
1064                };
1065
1066                let full_key_filter_key_extractor =
1067                    FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
1068
1069                let table_id_to_vnode =
1070                    HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
1071
1072                let table_id_to_watermark_serde = HashMap::from_iter(once((
1073                    TABLE_ID,
1074                    Some((
1075                        pk_serde.clone(),
1076                        watermark_col_serde.clone(),
1077                        watermark_col_idx_in_pk,
1078                    )),
1079                )));
1080
1081                let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1082                    full_key_filter_key_extractor,
1083                    table_id_to_vnode,
1084                    table_id_to_watermark_serde,
1085                    HashMap::default(),
1086                ));
1087
1088                let mut iter = NonPkPrefixSkipWatermarkIterator::new(
1089                    shared_buffer_batch.clone().unwrap().into_forward_iter(),
1090                    NonPkPrefixSkipWatermarkState::new(
1091                        BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
1092                        compaction_catalog_agent_ref,
1093                    ),
1094                );
1095
1096                iter.rewind().await.unwrap();
1097                assert!(iter.is_valid());
1098                let mut kv_pairs = (5..10_i32)
1099                    .map(|i| {
1100                        let (k, v) =
1101                            gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices);
1102                        (k, v)
1103                    })
1104                    .collect_vec();
1105                kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1106                let mut index = 0;
1107                while iter.is_valid() {
1108                    assert!(kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
1109                    iter.next().await.unwrap();
1110                    index += 1;
1111                }
1112            }
1113
1114            {
1115                // test watermark Descending
1116                let watermark = {
1117                    let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
1118                    serialize_pk(r1, &watermark_col_serde)
1119                };
1120
1121                let read_watermark = ReadTableWatermark {
1122                    direction: WatermarkDirection::Descending,
1123                    vnode_watermarks: BTreeMap::from_iter(
1124                        (0..2).map(|i| (VirtualNode::from_index(i), watermark.clone())),
1125                    ),
1126                };
1127
1128                let full_key_filter_key_extractor =
1129                    FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
1130
1131                let table_id_to_vnode =
1132                    HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
1133
1134                let table_id_to_watermark_serde = HashMap::from_iter(once((
1135                    TABLE_ID,
1136                    Some((
1137                        pk_serde.clone(),
1138                        watermark_col_serde.clone(),
1139                        watermark_col_idx_in_pk,
1140                    )),
1141                )));
1142
1143                let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1144                    full_key_filter_key_extractor,
1145                    table_id_to_vnode,
1146                    table_id_to_watermark_serde,
1147                    HashMap::default(),
1148                ));
1149
1150                let mut iter = NonPkPrefixSkipWatermarkIterator::new(
1151                    shared_buffer_batch.clone().unwrap().into_forward_iter(),
1152                    NonPkPrefixSkipWatermarkState::new(
1153                        BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
1154                        compaction_catalog_agent_ref,
1155                    ),
1156                );
1157
1158                iter.rewind().await.unwrap();
1159                assert!(iter.is_valid());
1160                let mut kv_pairs = (0..=5_i32)
1161                    .map(|i| {
1162                        let (k, v) =
1163                            gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices);
1164                        (k, v)
1165                    })
1166                    .collect_vec();
1167                kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1168                let mut index = 0;
1169                while iter.is_valid() {
1170                    assert!(kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
1171                    iter.next().await.unwrap();
1172                    index += 1;
1173                }
1174            }
1175        }
1176    }
1177
1178    #[tokio::test]
1179    async fn test_mix_watermark() {
1180        fn gen_key_value(
1181            vnode: usize,
1182            col_0: i32,
1183            col_1: i32,
1184            col_2: i32,
1185            col_3: i32,
1186            pk_serde: &OrderedRowSerde,
1187            pk_indices: &[usize],
1188        ) -> (TableKey<Bytes>, SharedBufferValue<Bytes>) {
1189            let r = OwnedRow::new(vec![
1190                Some(ScalarImpl::Int32(col_0)),
1191                Some(ScalarImpl::Int32(col_1)),
1192                Some(ScalarImpl::Int32(col_2)), // watermark column
1193                Some(ScalarImpl::Int32(col_3)),
1194            ]);
1195
1196            let pk = r.project(pk_indices);
1197
1198            let k1 = serialize_pk_with_vnode(pk, pk_serde, VirtualNode::from_index(vnode));
1199            let v1 = SharedBufferValue::Insert(Bytes::copy_from_slice(
1200                format!("{}-value-{}-{}-{}-{}", vnode, col_0, col_1, col_2, col_3).as_bytes(),
1201            ));
1202
1203            (k1, v1)
1204        }
1205
1206        let watermark_col_serde =
1207            OrderedRowSerde::new(vec![DataType::Int32], vec![OrderType::ascending()]);
1208        let t1_pk_serde = OrderedRowSerde::new(
1209            vec![DataType::Int32, DataType::Int32, DataType::Int32],
1210            vec![
1211                OrderType::ascending(),
1212                OrderType::ascending(),
1213                OrderType::ascending(),
1214            ],
1215        );
1216
1217        let t1_pk_indices = vec![0, 2, 3];
1218        let t1_watermark_col_idx_in_pk = 1;
1219
1220        let t2_pk_indices = vec![0, 1, 2];
1221
1222        let t2_pk_serde = OrderedRowSerde::new(
1223            vec![DataType::Int32, DataType::Int32, DataType::Int32],
1224            vec![
1225                OrderType::ascending(),
1226                OrderType::ascending(),
1227                OrderType::ascending(),
1228            ],
1229        );
1230
1231        let t1_id = TABLE_ID;
1232        let t2_id = TableId::from(t1_id.as_raw_id() + 1);
1233
1234        let t1_shared_buffer_batch = {
1235            let mut kv_pairs = (0..10_i32)
1236                .map(|i| {
1237                    gen_key_value(
1238                        i as usize % 2,
1239                        10 - i,
1240                        0,
1241                        i,
1242                        i,
1243                        &t1_pk_serde,
1244                        &t1_pk_indices,
1245                    )
1246                })
1247                .collect_vec();
1248
1249            kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1250            build_batch(kv_pairs.into_iter(), t1_id).unwrap()
1251        };
1252
1253        let t2_shared_buffer_batch = {
1254            let mut kv_pairs = (0..10_i32)
1255                .map(|i| {
1256                    gen_key_value(
1257                        i as usize % 2,
1258                        10 - i,
1259                        0,
1260                        0,
1261                        0,
1262                        &t2_pk_serde,
1263                        &t2_pk_indices,
1264                    )
1265                })
1266                .collect_vec();
1267
1268            kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1269            build_batch(kv_pairs.into_iter(), t2_id).unwrap()
1270        };
1271
1272        let t1_watermark = {
1273            let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
1274            serialize_pk(r1, &watermark_col_serde)
1275        };
1276
1277        let t1_read_watermark = ReadTableWatermark {
1278            direction: WatermarkDirection::Ascending,
1279            vnode_watermarks: BTreeMap::from_iter(
1280                (0..2).map(|i| (VirtualNode::from_index(i), t1_watermark.clone())),
1281            ),
1282        };
1283
1284        let t2_watermark = {
1285            let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
1286            serialize_pk(r1, &watermark_col_serde)
1287        };
1288
1289        let t2_read_watermark = ReadTableWatermark {
1290            direction: WatermarkDirection::Ascending,
1291            vnode_watermarks: BTreeMap::from_iter(
1292                (0..2).map(|i| (VirtualNode::from_index(i), t2_watermark.clone())),
1293            ),
1294        };
1295
1296        {
1297            // test non pk prefix watermark
1298            let t1_iter = t1_shared_buffer_batch.clone().into_forward_iter();
1299            let t2_iter = t2_shared_buffer_batch.clone().into_forward_iter();
1300            let iter_vec = vec![t1_iter, t2_iter];
1301            let merge_iter = MergeIterator::new(iter_vec);
1302
1303            let full_key_filter_key_extractor =
1304                FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
1305
1306            let table_id_to_vnode =
1307                HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
1308
1309            let table_id_to_watermark_serde = HashMap::from_iter(once((
1310                t1_id,
1311                Some((
1312                    t1_pk_serde.clone(),
1313                    watermark_col_serde.clone(),
1314                    t1_watermark_col_idx_in_pk,
1315                )),
1316            )));
1317
1318            let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1319                full_key_filter_key_extractor,
1320                table_id_to_vnode,
1321                table_id_to_watermark_serde,
1322                HashMap::default(),
1323            ));
1324
1325            let mut iter = NonPkPrefixSkipWatermarkIterator::new(
1326                merge_iter,
1327                NonPkPrefixSkipWatermarkState::new(
1328                    BTreeMap::from_iter(once((TABLE_ID, t1_read_watermark.clone()))),
1329                    compaction_catalog_agent_ref,
1330                ),
1331            );
1332
1333            iter.rewind().await.unwrap();
1334            assert!(iter.is_valid());
1335            let mut t1_kv_pairs = (5..10_i32)
1336                .map(|i| {
1337                    let (k, v) = gen_key_value(
1338                        i as usize % 2,
1339                        10 - i,
1340                        0,
1341                        i,
1342                        i,
1343                        &t1_pk_serde,
1344                        &t1_pk_indices,
1345                    );
1346                    (k, v)
1347                })
1348                .collect_vec();
1349
1350            t1_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1351
1352            let mut t2_kv_pairs = (0..10_i32)
1353                .map(|i| {
1354                    gen_key_value(
1355                        i as usize % 2,
1356                        10 - i,
1357                        0,
1358                        0,
1359                        0,
1360                        &t2_pk_serde,
1361                        &t2_pk_indices,
1362                    )
1363                })
1364                .collect_vec();
1365
1366            t2_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1367            let mut index = 0;
1368            for _ in 0..t1_kv_pairs.len() {
1369                assert!(t1_kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
1370                iter.next().await.unwrap();
1371                index += 1;
1372            }
1373
1374            assert!(iter.is_valid());
1375            assert_eq!(t1_kv_pairs.len(), index);
1376
1377            index = 0;
1378            for _ in 0..t2_kv_pairs.len() {
1379                assert!(t2_kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
1380                iter.next().await.unwrap();
1381                index += 1;
1382            }
1383
1384            assert!(!iter.is_valid());
1385            assert_eq!(t2_kv_pairs.len(), index);
1386        }
1387
1388        {
1389            let t1_iter = t1_shared_buffer_batch.clone().into_forward_iter();
1390            let t2_iter = t2_shared_buffer_batch.clone().into_forward_iter();
1391            let iter_vec = vec![t1_iter, t2_iter];
1392            let merge_iter = MergeIterator::new(iter_vec);
1393
1394            let full_key_filter_key_extractor =
1395                FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
1396
1397            let table_id_to_vnode = HashMap::from_iter(
1398                vec![
1399                    (t1_id, VirtualNode::COUNT_FOR_TEST),
1400                    (t2_id, VirtualNode::COUNT_FOR_TEST),
1401                ]
1402                .into_iter(),
1403            );
1404
1405            let table_id_to_watermark_serde = HashMap::from_iter(
1406                vec![
1407                    (
1408                        t1_id,
1409                        Some((
1410                            t1_pk_serde.clone(),
1411                            watermark_col_serde.clone(),
1412                            t1_watermark_col_idx_in_pk,
1413                        )),
1414                    ),
1415                    (t2_id, None),
1416                ]
1417                .into_iter(),
1418            );
1419
1420            let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1421                full_key_filter_key_extractor,
1422                table_id_to_vnode,
1423                table_id_to_watermark_serde,
1424                HashMap::default(),
1425            ));
1426
1427            let non_pk_prefix_iter = NonPkPrefixSkipWatermarkIterator::new(
1428                merge_iter,
1429                NonPkPrefixSkipWatermarkState::new(
1430                    BTreeMap::from_iter(once((t1_id, t1_read_watermark.clone()))),
1431                    compaction_catalog_agent_ref.clone(),
1432                ),
1433            );
1434
1435            let mut mix_iter = PkPrefixSkipWatermarkIterator::new(
1436                non_pk_prefix_iter,
1437                PkPrefixSkipWatermarkState::new(BTreeMap::from_iter(once((
1438                    t2_id,
1439                    t2_read_watermark.clone(),
1440                )))),
1441            );
1442
1443            mix_iter.rewind().await.unwrap();
1444            assert!(mix_iter.is_valid());
1445
1446            let mut t1_kv_pairs = (5..10_i32)
1447                .map(|i| {
1448                    let (k, v) = gen_key_value(
1449                        i as usize % 2,
1450                        10 - i,
1451                        0,
1452                        i,
1453                        i,
1454                        &t1_pk_serde,
1455                        &t1_pk_indices,
1456                    );
1457                    (k, v)
1458                })
1459                .collect_vec();
1460
1461            t1_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1462
1463            let mut t2_kv_pairs = (0..=5_i32)
1464                .map(|i| {
1465                    gen_key_value(
1466                        i as usize % 2,
1467                        10 - i,
1468                        0,
1469                        0,
1470                        0,
1471                        &t2_pk_serde,
1472                        &t2_pk_indices,
1473                    )
1474                })
1475                .collect_vec();
1476
1477            t2_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1478
1479            let mut index = 0;
1480            for _ in 0..t1_kv_pairs.len() {
1481                assert!(
1482                    t1_kv_pairs[index].0.as_ref() == mix_iter.key().user_key.table_key.as_ref()
1483                );
1484                mix_iter.next().await.unwrap();
1485                index += 1;
1486            }
1487
1488            assert!(mix_iter.is_valid());
1489            assert_eq!(t1_kv_pairs.len(), index);
1490
1491            index = 0;
1492
1493            for _ in 0..t2_kv_pairs.len() {
1494                assert!(
1495                    t2_kv_pairs[index].0.as_ref() == mix_iter.key().user_key.table_key.as_ref()
1496                );
1497                mix_iter.next().await.unwrap();
1498                index += 1;
1499            }
1500
1501            assert!(!mix_iter.is_valid());
1502            assert_eq!(t2_kv_pairs.len(), index);
1503        }
1504    }
1505}