risingwave_storage/hummock/iterator/
skip_watermark.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::{BTreeMap, VecDeque};
17
18use bytes::Bytes;
19use risingwave_common::catalog::TableId;
20use risingwave_common::hash::VirtualNode;
21use risingwave_common::row::Row;
22use risingwave_common::types::Datum;
23use risingwave_common::util::row_serde::OrderedRowSerde;
24use risingwave_hummock_sdk::compaction_group::hummock_version_ext::safe_epoch_read_table_watermarks_impl;
25use risingwave_hummock_sdk::key::FullKey;
26use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap, add_table_stats_map};
27use risingwave_hummock_sdk::table_watermark::{
28    ReadTableWatermark, TableWatermarks, WatermarkDirection,
29};
30
31use super::SkipWatermarkState;
32use crate::compaction_catalog_manager::{CompactionCatalogAgentRef, ValueWatermarkColumnSerdeRef};
33use crate::hummock::HummockResult;
34use crate::hummock::iterator::{Forward, HummockIterator, ValueMeta};
35use crate::hummock::value::HummockValue;
36use crate::monitor::StoreLocalStatistic;
37
38pub struct SkipWatermarkIterator<I, S> {
39    inner: I,
40    state: S,
41    /// The stats of skipped key-value pairs for each table.
42    skipped_entry_table_stats: TableStatsMap,
43    /// The id of table currently undergoing processing.
44    last_table_id: Option<TableId>,
45    /// The stats of table currently undergoing processing.
46    last_table_stats: TableStats,
47}
48
49impl<I: HummockIterator<Direction = Forward>, S: SkipWatermarkState> SkipWatermarkIterator<I, S> {
50    pub fn new(inner: I, state: S) -> Self {
51        Self {
52            inner,
53            state,
54            skipped_entry_table_stats: TableStatsMap::default(),
55            last_table_id: None,
56            last_table_stats: TableStats::default(),
57        }
58    }
59
60    fn reset_watermark(&mut self) {
61        self.state.reset_watermark();
62    }
63
64    fn reset_skipped_entry_table_stats(&mut self) {
65        self.skipped_entry_table_stats = TableStatsMap::default();
66        self.last_table_id = None;
67        self.last_table_stats = TableStats::default();
68    }
69
70    /// Advance the key until iterator invalid or the current key will not be filtered by the latest watermark.
71    /// Calling this method should ensure that the first remaining watermark has been advanced to the current key.
72    ///
73    /// Return a flag indicating whether should later advance the watermark.
74    async fn advance_key_and_watermark(&mut self) -> HummockResult<()> {
75        // advance key and watermark in an interleave manner until nothing
76        // changed after the method is called.
77        while self.inner.is_valid() {
78            if !self
79                .state
80                .should_delete(&self.inner.key(), self.inner.value())
81            {
82                break;
83            }
84
85            if self
86                .last_table_id
87                .is_none_or(|last_table_id| last_table_id != self.inner.key().user_key.table_id)
88            {
89                self.add_last_table_stats();
90                self.last_table_id = Some(self.inner.key().user_key.table_id);
91            }
92            self.last_table_stats.total_key_count -= 1;
93            self.last_table_stats.total_key_size -= self.inner.key().encoded_len() as i64;
94            self.last_table_stats.total_value_size -= self.inner.value().encoded_len() as i64;
95
96            self.inner.next().await?;
97        }
98        self.add_last_table_stats();
99        Ok(())
100    }
101
102    fn add_last_table_stats(&mut self) {
103        let Some(last_table_id) = self.last_table_id.take() else {
104            return;
105        };
106        let delta = std::mem::take(&mut self.last_table_stats);
107        let e = self
108            .skipped_entry_table_stats
109            .entry(last_table_id)
110            .or_default();
111        e.total_key_count += delta.total_key_count;
112        e.total_key_size += delta.total_key_size;
113        e.total_value_size += delta.total_value_size;
114    }
115}
116
117impl<I: HummockIterator<Direction = Forward>, S: SkipWatermarkState> HummockIterator
118    for SkipWatermarkIterator<I, S>
119{
120    type Direction = Forward;
121
122    async fn next(&mut self) -> HummockResult<()> {
123        self.inner.next().await?;
124        // Check whether there is any remaining watermark and return early to
125        // avoid calling the async `advance_key_and_watermark`, since in benchmark
126        // performance downgrade is observed without this early return.
127        if self.state.has_watermark() {
128            self.advance_key_and_watermark().await?;
129        }
130        Ok(())
131    }
132
133    fn key(&self) -> FullKey<&[u8]> {
134        self.inner.key()
135    }
136
137    fn value(&self) -> HummockValue<&[u8]> {
138        self.inner.value()
139    }
140
141    fn is_valid(&self) -> bool {
142        self.inner.is_valid()
143    }
144
145    async fn rewind(&mut self) -> HummockResult<()> {
146        self.reset_watermark();
147        self.reset_skipped_entry_table_stats();
148        self.inner.rewind().await?;
149        self.advance_key_and_watermark().await?;
150        Ok(())
151    }
152
153    async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
154        self.reset_watermark();
155        self.reset_skipped_entry_table_stats();
156        self.inner.seek(key).await?;
157        self.advance_key_and_watermark().await?;
158        Ok(())
159    }
160
161    fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
162        add_table_stats_map(
163            &mut stats.skipped_by_watermark_table_stats,
164            &self.skipped_entry_table_stats,
165        );
166        self.inner.collect_local_statistic(stats)
167    }
168
169    fn value_meta(&self) -> ValueMeta {
170        self.inner.value_meta()
171    }
172}
173pub struct PkPrefixSkipWatermarkState {
174    watermarks: BTreeMap<TableId, ReadTableWatermark>,
175    remain_watermarks: VecDeque<(TableId, VirtualNode, WatermarkDirection, Bytes)>,
176}
177
178impl SkipWatermarkState for PkPrefixSkipWatermarkState {
179    #[inline(always)]
180    fn has_watermark(&self) -> bool {
181        !self.remain_watermarks.is_empty()
182    }
183
184    fn should_delete(&mut self, key: &FullKey<&[u8]>, value: HummockValue<&[u8]>) -> bool {
185        if let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
186            let key_table_id = key.user_key.table_id;
187            let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
188            match (&key_table_id, &key_vnode).cmp(&(table_id, vnode)) {
189                Ordering::Less => {
190                    return false;
191                }
192                Ordering::Equal => {
193                    return direction.key_filter_by_watermark(inner_key, watermark);
194                }
195                Ordering::Greater => {
196                    // The current key has advanced over the watermark.
197                    // We may advance the watermark before advancing the key.
198                    return self.advance_watermark(key, value);
199                }
200            }
201        }
202        false
203    }
204
205    fn reset_watermark(&mut self) {
206        self.remain_watermarks = self
207            .watermarks
208            .iter()
209            .flat_map(|(table_id, read_watermarks)| {
210                read_watermarks
211                    .vnode_watermarks
212                    .iter()
213                    .map(|(vnode, watermarks)| {
214                        (
215                            *table_id,
216                            *vnode,
217                            read_watermarks.direction,
218                            watermarks.clone(),
219                        )
220                    })
221            })
222            .collect();
223    }
224
225    /// Advance watermark until no watermark remains or the first watermark can possibly
226    /// filter out the current or future key.
227    ///
228    /// Return a flag indicating whether the current key will be filtered by the current watermark.
229    fn advance_watermark(&mut self, key: &FullKey<&[u8]>, _value: HummockValue<&[u8]>) -> bool {
230        let key_table_id = key.user_key.table_id;
231        let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
232        while let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
233            match (table_id, vnode).cmp(&(&key_table_id, &key_vnode)) {
234                Ordering::Less => {
235                    self.remain_watermarks.pop_front();
236                    continue;
237                }
238                Ordering::Equal => {
239                    match direction {
240                        WatermarkDirection::Ascending => {
241                            match inner_key.cmp(watermark.as_ref()) {
242                                Ordering::Less => {
243                                    // The current key will be filtered by the watermark.
244                                    // Return true to further advance the key.
245                                    return true;
246                                }
247                                Ordering::Equal | Ordering::Greater => {
248                                    // The current key has passed the watermark.
249                                    // Advance the next watermark.
250                                    self.remain_watermarks.pop_front();
251                                    // Since it is impossible for a (table_id, vnode) tuple to have multiple
252                                    // watermark, after the pop_front, the next (table_id, vnode) must have
253                                    // exceeded the current key, and we can directly return and mark that the
254                                    // current key is not filtered by the watermark at the front.
255                                    #[cfg(debug_assertions)]
256                                    {
257                                        if let Some((next_table_id, next_vnode, _, _)) =
258                                            self.remain_watermarks.front()
259                                        {
260                                            assert!(
261                                                (next_table_id, next_vnode)
262                                                    > (&key_table_id, &key_vnode)
263                                            );
264                                        }
265                                    }
266                                    return false;
267                                }
268                            }
269                        }
270                        WatermarkDirection::Descending => {
271                            return match inner_key.cmp(watermark.as_ref()) {
272                                // Current key as not reached the watermark. Just return.
273                                Ordering::Less | Ordering::Equal => false,
274                                // Current key will be filtered by the watermark.
275                                // Return true to further advance the key.
276                                Ordering::Greater => true,
277                            };
278                        }
279                    }
280                }
281                Ordering::Greater => {
282                    return false;
283                }
284            }
285        }
286        false
287    }
288}
289
290impl PkPrefixSkipWatermarkState {
291    pub fn new(watermarks: BTreeMap<TableId, ReadTableWatermark>) -> Self {
292        Self {
293            remain_watermarks: VecDeque::new(),
294            watermarks,
295        }
296    }
297
298    pub fn from_safe_epoch_watermarks(
299        safe_epoch_watermarks: BTreeMap<TableId, TableWatermarks>,
300    ) -> Self {
301        let watermarks = safe_epoch_read_table_watermarks_impl(safe_epoch_watermarks);
302        Self::new(watermarks)
303    }
304}
305
306pub struct NonPkPrefixSkipWatermarkState {
307    watermarks: BTreeMap<TableId, ReadTableWatermark>,
308    remain_watermarks: VecDeque<(TableId, VirtualNode, WatermarkDirection, Datum)>,
309    compaction_catalog_agent_ref: CompactionCatalogAgentRef,
310
311    last_serde: Option<(OrderedRowSerde, OrderedRowSerde, usize)>,
312    last_table_id: Option<TableId>,
313}
314
315impl NonPkPrefixSkipWatermarkState {
316    pub fn new(
317        watermarks: BTreeMap<TableId, ReadTableWatermark>,
318        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
319    ) -> Self {
320        Self {
321            remain_watermarks: VecDeque::new(),
322            watermarks,
323            compaction_catalog_agent_ref,
324            last_serde: None,
325            last_table_id: None,
326        }
327    }
328
329    pub fn from_safe_epoch_watermarks(
330        safe_epoch_watermarks: BTreeMap<TableId, TableWatermarks>,
331        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
332    ) -> Self {
333        let watermarks = safe_epoch_read_table_watermarks_impl(safe_epoch_watermarks);
334        Self::new(watermarks, compaction_catalog_agent_ref)
335    }
336}
337
338impl SkipWatermarkState for NonPkPrefixSkipWatermarkState {
339    #[inline(always)]
340    fn has_watermark(&self) -> bool {
341        !self.remain_watermarks.is_empty()
342    }
343
344    fn should_delete(&mut self, key: &FullKey<&[u8]>, value: HummockValue<&[u8]>) -> bool {
345        if let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
346            let key_table_id = key.user_key.table_id;
347            {
348                if self
349                    .last_table_id
350                    .is_none_or(|last_table_id| last_table_id != key_table_id)
351                {
352                    self.last_table_id = Some(key_table_id);
353                }
354            }
355
356            let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
357            match (&key_table_id, &key_vnode).cmp(&(table_id, vnode)) {
358                Ordering::Less => {
359                    return false;
360                }
361                Ordering::Equal => {
362                    if self.last_serde.is_none() {
363                        self.last_serde =
364                            self.compaction_catalog_agent_ref.watermark_serde(*table_id);
365                    }
366                    let (pk_prefix_serde, watermark_col_serde, watermark_col_idx_in_pk) =
367                        self.last_serde.as_ref().unwrap();
368                    let row = pk_prefix_serde
369                        .deserialize(inner_key)
370                        .unwrap_or_else(|_| {
371                            panic!("Failed to deserialize pk_prefix inner_key {:?} serde data_types {:?} order_types {:?}", inner_key, pk_prefix_serde.get_data_types(), pk_prefix_serde.get_order_types());
372                        });
373                    let watermark_col_in_pk = row.datum_at(*watermark_col_idx_in_pk);
374                    return direction.datum_filter_by_watermark(
375                        watermark_col_in_pk,
376                        watermark,
377                        watermark_col_serde.get_order_types()[0],
378                    );
379                }
380                Ordering::Greater => {
381                    // The current key has advanced over the watermark.
382                    // We may advance the watermark before advancing the key.
383                    return self.advance_watermark(key, value);
384                }
385            }
386        }
387        false
388    }
389
390    fn reset_watermark(&mut self) {
391        self.last_serde = None;
392        self.remain_watermarks = self
393            .watermarks
394            .iter()
395            .flat_map(|(table_id, read_watermarks)| {
396                let watermark_serde = self.compaction_catalog_agent_ref.watermark_serde(*table_id).map(|(_pk_serde, watermark_serde, _watermark_col_idx_in_pk)| watermark_serde);
397
398                read_watermarks
399                    .vnode_watermarks
400                    .iter()
401                    .flat_map(move |(vnode, watermarks)| {
402                        // TODO(ttl): if the watermark column is in the value, we may get a `None` here, support it.
403                        let watermark_serde = watermark_serde.as_ref()?;
404                        Some((
405                            *table_id,
406                            *vnode,
407                            read_watermarks.direction,
408                            {
409                                let row = watermark_serde
410                                .deserialize(watermarks).unwrap_or_else(|_| {
411                                    panic!("Failed to deserialize watermark {:?} serde data_types {:?} order_types {:?}", watermarks, watermark_serde.get_data_types(), watermark_serde.get_order_types());
412                                });
413                                row[0].clone()
414                            },
415                        ))
416                    })
417            })
418            .collect();
419    }
420
421    fn advance_watermark(&mut self, key: &FullKey<&[u8]>, _value: HummockValue<&[u8]>) -> bool {
422        let key_table_id = key.user_key.table_id;
423        let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
424        while let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
425            match (table_id, vnode).cmp(&(&key_table_id, &key_vnode)) {
426                Ordering::Less => {
427                    self.remain_watermarks.pop_front();
428                    self.last_serde = None;
429                    continue;
430                }
431                Ordering::Equal => {
432                    if self.last_serde.is_none() {
433                        self.last_serde =
434                            self.compaction_catalog_agent_ref.watermark_serde(*table_id);
435                    }
436                    let (pk_prefix_serde, watermark_col_serde, watermark_col_idx_in_pk) =
437                        self.last_serde.as_ref().unwrap();
438
439                    let row = pk_prefix_serde
440                        .deserialize(inner_key)
441                        .unwrap_or_else(|_| {
442                            panic!("Failed to deserialize pk_prefix inner_key {:?} serde data_types {:?} order_types {:?}", inner_key, pk_prefix_serde.get_data_types(), pk_prefix_serde.get_order_types());
443                        });
444                    let watermark_col_in_pk = row.datum_at(*watermark_col_idx_in_pk);
445
446                    return direction.datum_filter_by_watermark(
447                        watermark_col_in_pk,
448                        watermark,
449                        watermark_col_serde.get_order_types()[0],
450                    );
451                }
452                Ordering::Greater => {
453                    return false;
454                }
455            }
456        }
457        false
458    }
459}
460
461pub type PkPrefixSkipWatermarkIterator<I> = SkipWatermarkIterator<I, PkPrefixSkipWatermarkState>;
462
463pub type NonPkPrefixSkipWatermarkIterator<I> =
464    SkipWatermarkIterator<I, NonPkPrefixSkipWatermarkState>;
465
466pub type ValueSkipWatermarkIterator<I> = SkipWatermarkIterator<I, ValueSkipWatermarkState>;
467
468pub struct ValueSkipWatermarkState {
469    pub watermarks: BTreeMap<TableId, ReadTableWatermark>,
470    remain_watermarks: VecDeque<(TableId, VirtualNode, WatermarkDirection, Bytes)>,
471    compaction_catalog_agent_ref: CompactionCatalogAgentRef,
472    last_serde: Option<ValueWatermarkColumnSerdeRef>,
473    last_table_id: Option<TableId>,
474}
475
476impl ValueSkipWatermarkState {
477    pub fn new(
478        watermarks: BTreeMap<TableId, ReadTableWatermark>,
479        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
480    ) -> Self {
481        Self {
482            remain_watermarks: VecDeque::new(),
483            watermarks,
484            compaction_catalog_agent_ref,
485            last_serde: None,
486            last_table_id: None,
487        }
488    }
489
490    pub fn from_safe_epoch_watermarks(
491        safe_epoch_watermarks: BTreeMap<TableId, TableWatermarks>,
492        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
493    ) -> Self {
494        let watermarks = safe_epoch_read_table_watermarks_impl(safe_epoch_watermarks);
495        Self::new(watermarks, compaction_catalog_agent_ref)
496    }
497
498    pub fn may_delete(&self, key: &FullKey<&[u8]>) -> bool {
499        let table_id = key.user_key.table_id;
500        self.watermarks.contains_key(&table_id)
501    }
502}
503
504impl SkipWatermarkState for ValueSkipWatermarkState {
505    #[inline(always)]
506    fn has_watermark(&self) -> bool {
507        !self.remain_watermarks.is_empty()
508    }
509
510    fn should_delete(&mut self, key: &FullKey<&[u8]>, value: HummockValue<&[u8]>) -> bool {
511        if let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
512            let key_table_id = key.user_key.table_id;
513            let key_vnode = key.user_key.table_key.vnode_part();
514            if self
515                .last_table_id
516                .is_none_or(|last_table_id| last_table_id != key_table_id)
517            {
518                self.last_table_id = Some(key_table_id);
519            }
520            match (&key_table_id, &key_vnode).cmp(&(table_id, vnode)) {
521                Ordering::Less => {
522                    return false;
523                }
524                Ordering::Equal => {
525                    let HummockValue::Put(value) = value else {
526                        return false;
527                    };
528                    if self.last_serde.is_none() {
529                        self.last_serde = self
530                            .compaction_catalog_agent_ref
531                            .value_watermark_serde(*table_id);
532                    }
533                    let last_serde = self.last_serde.as_ref().unwrap();
534                    let Ok(watermark_column_value) = last_serde.deserialize(value) else {
535                        tracing::error!(
536                            ?table_id,
537                            ?vnode,
538                            ?value,
539                            "Failed to deserialize watermark column"
540                        );
541                        return false;
542                    };
543                    let Some(watermark_column_value) = watermark_column_value else {
544                        tracing::debug!(
545                            ?table_id,
546                            ?vnode,
547                            ?value,
548                            "The specified watermark column was not found in the value, likely due to the use of a non-current catalog. Restarting the compactor should resolve this issue."
549                        );
550                        return false;
551                    };
552                    return direction.key_filter_by_watermark(&watermark_column_value, watermark);
553                }
554                Ordering::Greater => {
555                    // The current key has advanced over the watermark.
556                    // We may advance the watermark before advancing the key.
557                    return self.advance_watermark(key, value);
558                }
559            }
560        }
561        false
562    }
563
564    fn reset_watermark(&mut self) {
565        self.last_serde = None;
566        self.remain_watermarks = self
567            .watermarks
568            .iter()
569            .flat_map(|(table_id, read_watermarks)| {
570                read_watermarks
571                    .vnode_watermarks
572                    .iter()
573                    .map(|(vnode, watermarks)| {
574                        (
575                            *table_id,
576                            *vnode,
577                            read_watermarks.direction,
578                            watermarks.clone(),
579                        )
580                    })
581            })
582            .collect();
583    }
584
585    fn advance_watermark(&mut self, key: &FullKey<&[u8]>, value: HummockValue<&[u8]>) -> bool {
586        let key_table_id = key.user_key.table_id;
587        let key_vnode = key.user_key.table_key.vnode_part();
588        while let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
589            match (table_id, vnode).cmp(&(&key_table_id, &key_vnode)) {
590                Ordering::Less => {
591                    self.remain_watermarks.pop_front();
592                    self.last_serde = None;
593                    continue;
594                }
595                Ordering::Equal => {
596                    let HummockValue::Put(value) = value else {
597                        return false;
598                    };
599                    if self.last_serde.is_none() {
600                        self.last_serde = self
601                            .compaction_catalog_agent_ref
602                            .value_watermark_serde(*table_id);
603                    }
604                    let last_serde = self.last_serde.as_ref().unwrap();
605                    let Ok(watermark_column_value) = last_serde.deserialize(value) else {
606                        tracing::error!(
607                            ?table_id,
608                            ?vnode,
609                            ?value,
610                            "Failed to deserialize watermark column."
611                        );
612                        return false;
613                    };
614                    let Some(watermark_column_value) = watermark_column_value else {
615                        tracing::warn!(
616                            ?table_id,
617                            ?vnode,
618                            ?value,
619                            "The specified watermark column was not found in the value, likely due to the use of a non-current catalog. Restarting the compactor should resolve this issue."
620                        );
621                        return false;
622                    };
623                    return direction.key_filter_by_watermark(&watermark_column_value, watermark);
624                }
625                Ordering::Greater => {
626                    return false;
627                }
628            }
629        }
630        false
631    }
632}
633
634#[cfg(test)]
635mod tests {
636    use std::collections::{BTreeMap, HashMap};
637    use std::iter::{empty, once};
638    use std::sync::Arc;
639
640    use bytes::Bytes;
641    use itertools::Itertools;
642    use risingwave_common::catalog::TableId;
643    use risingwave_common::hash::VirtualNode;
644    use risingwave_common::row::{OwnedRow, RowExt};
645    use risingwave_common::types::{DataType, ScalarImpl};
646    use risingwave_common::util::epoch::test_epoch;
647    use risingwave_common::util::row_serde::OrderedRowSerde;
648    use risingwave_common::util::sort_util::OrderType;
649    use risingwave_hummock_sdk::EpochWithGap;
650    use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey, gen_key_from_str};
651    use risingwave_hummock_sdk::table_watermark::{ReadTableWatermark, WatermarkDirection};
652
653    use super::PkPrefixSkipWatermarkState;
654    use crate::compaction_catalog_manager::{
655        CompactionCatalogAgent, FilterKeyExtractorImpl, FullKeyFilterKeyExtractor,
656    };
657    use crate::hummock::HummockValue;
658    use crate::hummock::iterator::{
659        HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
660        NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator, SkipWatermarkState,
661    };
662    use crate::hummock::shared_buffer::shared_buffer_batch::{
663        SharedBufferBatch, SharedBufferValue,
664    };
665    use crate::row_serde::row_serde_util::{serialize_pk, serialize_pk_with_vnode};
666
667    const EPOCH: u64 = test_epoch(1);
668    const TABLE_ID: TableId = TableId::new(233);
669
670    async fn assert_iter_eq(
671        mut first: Option<impl HummockIterator>,
672        mut second: impl HummockIterator,
673        seek_key: Option<(usize, usize)>,
674    ) {
675        if let Some((vnode, key_index)) = seek_key {
676            let (seek_key, _) = gen_key_value(vnode, key_index);
677            let full_key = FullKey {
678                user_key: UserKey {
679                    table_id: TABLE_ID,
680                    table_key: seek_key,
681                },
682                epoch_with_gap: EpochWithGap::new_from_epoch(EPOCH),
683            };
684            if let Some(first) = &mut first {
685                first.seek(full_key.to_ref()).await.unwrap();
686            }
687            second.seek(full_key.to_ref()).await.unwrap()
688        } else {
689            if let Some(first) = &mut first {
690                first.rewind().await.unwrap();
691            }
692            second.rewind().await.unwrap();
693        }
694
695        if let Some(first) = &mut first {
696            while first.is_valid() {
697                assert!(second.is_valid());
698                let first_key = first.key();
699                let second_key = second.key();
700                assert_eq!(first_key, second_key);
701                assert_eq!(first.value(), second.value());
702                first.next().await.unwrap();
703                second.next().await.unwrap();
704            }
705        }
706        assert!(!second.is_valid());
707    }
708
709    fn build_batch(
710        pairs: impl Iterator<Item = (TableKey<Bytes>, SharedBufferValue<Bytes>)>,
711        table_id: TableId,
712    ) -> Option<SharedBufferBatch> {
713        let pairs: Vec<_> = pairs.collect();
714        if pairs.is_empty() {
715            None
716        } else {
717            Some(SharedBufferBatch::for_test(pairs, EPOCH, table_id))
718        }
719    }
720
721    fn filter_with_watermarks(
722        iter: impl Iterator<Item = (TableKey<Bytes>, SharedBufferValue<Bytes>)>,
723        table_watermarks: ReadTableWatermark,
724    ) -> impl Iterator<Item = (TableKey<Bytes>, SharedBufferValue<Bytes>)> {
725        iter.filter(move |(key, _)| {
726            if let Some(watermark) = table_watermarks.vnode_watermarks.get(&key.vnode_part()) {
727                !table_watermarks
728                    .direction
729                    .key_filter_by_watermark(key.key_part(), watermark)
730            } else {
731                true
732            }
733        })
734    }
735
736    fn gen_inner_key(index: usize) -> String {
737        format!("key{:5}", index)
738    }
739
740    async fn test_watermark(
741        watermarks: impl IntoIterator<Item = (usize, usize)>,
742        direction: WatermarkDirection,
743    ) {
744        let test_index: [(usize, usize); 7] =
745            [(0, 2), (0, 3), (0, 4), (1, 1), (1, 3), (4, 2), (8, 1)];
746        let items = test_index
747            .iter()
748            .map(|(vnode, key_index)| gen_key_value(*vnode, *key_index))
749            .collect_vec();
750
751        let read_watermark = ReadTableWatermark {
752            direction,
753            vnode_watermarks: BTreeMap::from_iter(watermarks.into_iter().map(
754                |(vnode, key_index)| {
755                    (
756                        VirtualNode::from_index(vnode),
757                        Bytes::from(gen_inner_key(key_index)),
758                    )
759                },
760            )),
761        };
762
763        let gen_iters = || {
764            let batch = build_batch(
765                filter_with_watermarks(items.clone().into_iter(), read_watermark.clone()),
766                TABLE_ID,
767            );
768
769            let iter = PkPrefixSkipWatermarkIterator::new(
770                build_batch(items.clone().into_iter(), TABLE_ID)
771                    .unwrap()
772                    .into_forward_iter(),
773                PkPrefixSkipWatermarkState::new(BTreeMap::from_iter(once((
774                    TABLE_ID,
775                    read_watermark.clone(),
776                )))),
777            );
778            (batch.map(|batch| batch.into_forward_iter()), iter)
779        };
780        let (first, second) = gen_iters();
781        assert_iter_eq(first, second, None).await;
782        for (vnode, key_index) in &test_index {
783            let (first, second) = gen_iters();
784            assert_iter_eq(first, second, Some((*vnode, *key_index))).await;
785        }
786        let (last_vnode, last_key_index) = test_index.last().unwrap();
787        let (first, second) = gen_iters();
788        assert_iter_eq(first, second, Some((*last_vnode, last_key_index + 1))).await;
789    }
790
791    fn gen_key_value(vnode: usize, index: usize) -> (TableKey<Bytes>, SharedBufferValue<Bytes>) {
792        (
793            gen_key_from_str(VirtualNode::from_index(vnode), &gen_inner_key(index)),
794            SharedBufferValue::Insert(Bytes::copy_from_slice(
795                format!("{}-value-{}", vnode, index).as_bytes(),
796            )),
797        )
798    }
799
800    fn gen_full_key(table_id: TableId, vnode: usize, inner_key: &str) -> FullKey<Bytes> {
801        FullKey {
802            user_key: UserKey {
803                table_id,
804                table_key: gen_key_from_str(VirtualNode::from_index(vnode), inner_key),
805            },
806            epoch_with_gap: EpochWithGap::new_from_epoch(EPOCH),
807        }
808    }
809
810    fn assert_pk_prefix_decision_stable_after_progress(
811        watermarks: BTreeMap<TableId, ReadTableWatermark>,
812        progressed_key: FullKey<Bytes>,
813        later_key: FullKey<Bytes>,
814    ) {
815        let mut fresh_state = PkPrefixSkipWatermarkState::new(watermarks.clone());
816        fresh_state.reset_watermark();
817
818        let mut advanced_state = PkPrefixSkipWatermarkState::new(watermarks);
819        advanced_state.reset_watermark();
820
821        advanced_state.should_delete(&progressed_key.to_ref(), HummockValue::Put(&[]));
822
823        assert_eq!(
824            fresh_state.should_delete(&later_key.to_ref(), HummockValue::Put(&[])),
825            advanced_state.should_delete(&later_key.to_ref(), HummockValue::Put(&[])),
826            "pk-prefix watermark decision diverged after prior progress",
827        );
828    }
829
830    fn gen_non_pk_full_key(
831        table_id: TableId,
832        vnode: usize,
833        col_0: i32,
834        watermark_col: i32,
835        col_2: i32,
836        pk_serde: &OrderedRowSerde,
837    ) -> FullKey<Bytes> {
838        let pk = OwnedRow::new(vec![
839            Some(ScalarImpl::Int32(col_0)),
840            Some(ScalarImpl::Int32(watermark_col)),
841            Some(ScalarImpl::Int32(col_2)),
842        ]);
843        FullKey {
844            user_key: UserKey {
845                table_id,
846                table_key: serialize_pk_with_vnode(pk, pk_serde, VirtualNode::from_index(vnode)),
847            },
848            epoch_with_gap: EpochWithGap::new_from_epoch(EPOCH),
849        }
850    }
851
852    fn assert_non_pk_prefix_decision_stable_after_progress(
853        watermarks: BTreeMap<TableId, ReadTableWatermark>,
854        compaction_catalog_agent_ref: Arc<CompactionCatalogAgent>,
855        progressed_key: FullKey<Bytes>,
856        later_key: FullKey<Bytes>,
857    ) {
858        let mut fresh_state = NonPkPrefixSkipWatermarkState::new(
859            watermarks.clone(),
860            compaction_catalog_agent_ref.clone(),
861        );
862        fresh_state.reset_watermark();
863
864        let mut advanced_state =
865            NonPkPrefixSkipWatermarkState::new(watermarks, compaction_catalog_agent_ref);
866        advanced_state.reset_watermark();
867
868        advanced_state.should_delete(&progressed_key.to_ref(), HummockValue::Put(&[]));
869
870        assert_eq!(
871            fresh_state.should_delete(&later_key.to_ref(), HummockValue::Put(&[])),
872            advanced_state.should_delete(&later_key.to_ref(), HummockValue::Put(&[])),
873            "non-pk watermark decision diverged after prior progress",
874        );
875    }
876
877    #[tokio::test]
878    async fn test_no_watermark() {
879        test_watermark(empty(), WatermarkDirection::Ascending).await;
880        test_watermark(empty(), WatermarkDirection::Descending).await;
881    }
882
883    #[tokio::test]
884    async fn test_too_low_watermark() {
885        test_watermark(vec![(0, 0)], WatermarkDirection::Ascending).await;
886        test_watermark(vec![(0, 10)], WatermarkDirection::Descending).await;
887    }
888
889    #[tokio::test]
890    async fn test_single_watermark() {
891        test_watermark(vec![(0, 3)], WatermarkDirection::Ascending).await;
892        test_watermark(vec![(0, 3)], WatermarkDirection::Descending).await;
893    }
894
895    #[tokio::test]
896    async fn test_watermark_vnode_no_data() {
897        test_watermark(vec![(3, 3)], WatermarkDirection::Ascending).await;
898        test_watermark(vec![(3, 3)], WatermarkDirection::Descending).await;
899    }
900
901    #[tokio::test]
902    async fn test_filter_all() {
903        test_watermark(
904            vec![(0, 5), (1, 4), (2, 0), (4, 3), (8, 2)],
905            WatermarkDirection::Ascending,
906        )
907        .await;
908        test_watermark(
909            vec![(0, 0), (1, 0), (2, 0), (4, 0), (8, 0)],
910            WatermarkDirection::Descending,
911        )
912        .await;
913    }
914
915    #[tokio::test]
916    async fn test_advance_multi_vnode() {
917        test_watermark(vec![(1, 2), (8, 0)], WatermarkDirection::Ascending).await;
918    }
919
920    #[test]
921    fn test_pk_prefix_watermark_decision_stable_after_prior_progress() {
922        let table_1 = TableId::new(1);
923        let table_2 = TableId::new(2);
924        let vnode = VirtualNode::from_index(0);
925        let watermarks = BTreeMap::from_iter([
926            (
927                table_1,
928                ReadTableWatermark {
929                    direction: WatermarkDirection::Ascending,
930                    vnode_watermarks: BTreeMap::from_iter(once((
931                        vnode,
932                        Bytes::from_static(b"mid"),
933                    ))),
934                },
935            ),
936            (
937                table_2,
938                ReadTableWatermark {
939                    direction: WatermarkDirection::Ascending,
940                    vnode_watermarks: BTreeMap::from_iter(once((
941                        vnode,
942                        Bytes::from_static(b"keep"),
943                    ))),
944                },
945            ),
946        ]);
947
948        assert_pk_prefix_decision_stable_after_progress(
949            watermarks.clone(),
950            gen_full_key(table_1, 0, "z-after-watermark"),
951            gen_full_key(table_2, 0, "drop-before-watermark"),
952        );
953
954        assert_pk_prefix_decision_stable_after_progress(
955            watermarks,
956            gen_full_key(table_1, 0, "z-after-watermark"),
957            gen_full_key(table_2, 0, "z-after-watermark"),
958        );
959    }
960
961    #[test]
962    fn test_non_pk_prefix_watermark_decision_stable_after_prior_progress() {
963        let table_1 = TableId::new(1);
964        let table_2 = TableId::new(2);
965        let vnode = VirtualNode::from_index(0);
966        let watermark_direction = WatermarkDirection::Ascending;
967        let watermark_col_serde =
968            OrderedRowSerde::new(vec![DataType::Int32], vec![OrderType::ascending()]);
969        let pk_serde = OrderedRowSerde::new(
970            vec![DataType::Int32, DataType::Int32, DataType::Int32],
971            vec![
972                OrderType::ascending(),
973                OrderType::ascending(),
974                OrderType::ascending(),
975            ],
976        );
977        let watermark_col_idx_in_pk = 1;
978        let watermark = serialize_pk(
979            OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]),
980            &watermark_col_serde,
981        );
982
983        let watermarks = BTreeMap::from_iter([table_1, table_2].into_iter().map(|table_id| {
984            (
985                table_id,
986                ReadTableWatermark {
987                    direction: watermark_direction,
988                    vnode_watermarks: BTreeMap::from_iter(once((vnode, watermark.clone()))),
989                },
990            )
991        }));
992
993        let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
994            FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor),
995            HashMap::from_iter(
996                [table_1, table_2]
997                    .into_iter()
998                    .map(|table_id| (table_id, VirtualNode::COUNT_FOR_TEST)),
999            ),
1000            HashMap::from_iter([table_1, table_2].into_iter().map(|table_id| {
1001                (
1002                    table_id,
1003                    Some((
1004                        pk_serde.clone(),
1005                        watermark_col_serde.clone(),
1006                        watermark_col_idx_in_pk,
1007                    )),
1008                )
1009            })),
1010            HashMap::default(),
1011        ));
1012
1013        assert_non_pk_prefix_decision_stable_after_progress(
1014            watermarks.clone(),
1015            compaction_catalog_agent_ref.clone(),
1016            gen_non_pk_full_key(table_1, 0, 1, 8, 1, &pk_serde),
1017            gen_non_pk_full_key(table_2, 0, 1, 3, 1, &pk_serde),
1018        );
1019
1020        assert_non_pk_prefix_decision_stable_after_progress(
1021            watermarks,
1022            compaction_catalog_agent_ref,
1023            gen_non_pk_full_key(table_1, 0, 1, 8, 1, &pk_serde),
1024            gen_non_pk_full_key(table_2, 0, 1, 8, 1, &pk_serde),
1025        );
1026    }
1027
1028    #[tokio::test]
1029    async fn test_non_pk_prefix_watermark() {
1030        fn gen_key_value(
1031            vnode: usize,
1032            col_0: i32,
1033            col_1: i32,
1034            col_2: i32,
1035            col_3: i32,
1036            pk_serde: &OrderedRowSerde,
1037            pk_indices: &[usize],
1038        ) -> (TableKey<Bytes>, SharedBufferValue<Bytes>) {
1039            let r = OwnedRow::new(vec![
1040                Some(ScalarImpl::Int32(col_0)),
1041                Some(ScalarImpl::Int32(col_1)),
1042                Some(ScalarImpl::Int32(col_2)), // watermark column
1043                Some(ScalarImpl::Int32(col_3)),
1044            ]);
1045
1046            let pk = r.project(pk_indices);
1047
1048            let k1 = serialize_pk_with_vnode(pk, pk_serde, VirtualNode::from_index(vnode));
1049            let v1 = SharedBufferValue::Insert(Bytes::copy_from_slice(
1050                format!("{}-value-{}", vnode, col_2).as_bytes(),
1051            ));
1052            (k1, v1)
1053        }
1054
1055        let watermark_direction = WatermarkDirection::Ascending;
1056
1057        let watermark_col_serde =
1058            OrderedRowSerde::new(vec![DataType::Int32], vec![OrderType::ascending()]);
1059        let pk_serde = OrderedRowSerde::new(
1060            vec![DataType::Int32, DataType::Int32, DataType::Int32],
1061            vec![
1062                OrderType::ascending(),
1063                OrderType::ascending(),
1064                OrderType::ascending(),
1065            ],
1066        );
1067
1068        let pk_indices = vec![0, 2, 3];
1069        let watermark_col_idx_in_pk = 1;
1070
1071        {
1072            // test single vnode
1073            let shared_buffer_batch = {
1074                let mut kv_pairs = (0..10)
1075                    .map(|i| gen_key_value(0, i, 0, i, i, &pk_serde, &pk_indices))
1076                    .collect_vec();
1077                kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1078                build_batch(kv_pairs.into_iter(), TABLE_ID)
1079            }
1080            .unwrap();
1081
1082            {
1083                // empty read watermark
1084                let read_watermark = ReadTableWatermark {
1085                    direction: watermark_direction,
1086                    vnode_watermarks: BTreeMap::default(),
1087                };
1088
1089                let compaction_catalog_agent_ref = CompactionCatalogAgent::for_test(vec![TABLE_ID]);
1090
1091                let mut iter = NonPkPrefixSkipWatermarkIterator::new(
1092                    shared_buffer_batch.clone().into_forward_iter(),
1093                    NonPkPrefixSkipWatermarkState::new(
1094                        BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
1095                        compaction_catalog_agent_ref,
1096                    ),
1097                );
1098
1099                iter.rewind().await.unwrap();
1100                assert!(iter.is_valid());
1101                for i in 0..10 {
1102                    let (k, _v) = gen_key_value(0, i, 0, i, i, &pk_serde, &pk_indices);
1103                    assert_eq!(iter.key().user_key.table_key.as_ref(), k.as_ref());
1104                    iter.next().await.unwrap();
1105                }
1106                assert!(!iter.is_valid());
1107            }
1108
1109            {
1110                // test watermark
1111                let watermark = {
1112                    let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
1113                    serialize_pk(r1, &watermark_col_serde)
1114                };
1115
1116                let read_watermark = ReadTableWatermark {
1117                    direction: watermark_direction,
1118                    vnode_watermarks: BTreeMap::from_iter(once((
1119                        VirtualNode::from_index(0),
1120                        watermark.clone(),
1121                    ))),
1122                };
1123
1124                let full_key_filter_key_extractor =
1125                    FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
1126
1127                let table_id_to_vnode =
1128                    HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
1129
1130                let table_id_to_watermark_serde = HashMap::from_iter(once((
1131                    TABLE_ID,
1132                    Some((
1133                        pk_serde.clone(),
1134                        watermark_col_serde.clone(),
1135                        watermark_col_idx_in_pk,
1136                    )),
1137                )));
1138
1139                let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1140                    full_key_filter_key_extractor,
1141                    table_id_to_vnode,
1142                    table_id_to_watermark_serde,
1143                    HashMap::default(),
1144                ));
1145
1146                let mut iter = NonPkPrefixSkipWatermarkIterator::new(
1147                    shared_buffer_batch.clone().into_forward_iter(),
1148                    NonPkPrefixSkipWatermarkState::new(
1149                        BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
1150                        compaction_catalog_agent_ref,
1151                    ),
1152                );
1153
1154                iter.rewind().await.unwrap();
1155                assert!(iter.is_valid());
1156                for i in 5..10 {
1157                    let (k, _v) = gen_key_value(0, i, 0, i, i, &pk_serde, &pk_indices);
1158                    assert_eq!(iter.key().user_key.table_key.as_ref(), k.as_ref());
1159                    iter.next().await.unwrap();
1160                }
1161                assert!(!iter.is_valid());
1162            }
1163        }
1164
1165        {
1166            // test multi vnode
1167            let shared_buffer_batch = {
1168                let mut kv_pairs = (0..10_i32)
1169                    .map(|i| gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices))
1170                    .collect_vec();
1171
1172                kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1173                build_batch(kv_pairs.into_iter(), TABLE_ID)
1174            };
1175
1176            {
1177                // test watermark
1178                let watermark = {
1179                    let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
1180                    serialize_pk(r1, &watermark_col_serde)
1181                };
1182
1183                let read_watermark = ReadTableWatermark {
1184                    direction: watermark_direction,
1185                    vnode_watermarks: BTreeMap::from_iter(
1186                        (0..2).map(|i| (VirtualNode::from_index(i), watermark.clone())),
1187                    ),
1188                };
1189
1190                let full_key_filter_key_extractor =
1191                    FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
1192
1193                let table_id_to_vnode =
1194                    HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
1195
1196                let table_id_to_watermark_serde = HashMap::from_iter(once((
1197                    TABLE_ID,
1198                    Some((
1199                        pk_serde.clone(),
1200                        watermark_col_serde.clone(),
1201                        watermark_col_idx_in_pk,
1202                    )),
1203                )));
1204
1205                let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1206                    full_key_filter_key_extractor,
1207                    table_id_to_vnode,
1208                    table_id_to_watermark_serde,
1209                    HashMap::default(),
1210                ));
1211
1212                let mut iter = NonPkPrefixSkipWatermarkIterator::new(
1213                    shared_buffer_batch.clone().unwrap().into_forward_iter(),
1214                    NonPkPrefixSkipWatermarkState::new(
1215                        BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
1216                        compaction_catalog_agent_ref,
1217                    ),
1218                );
1219
1220                iter.rewind().await.unwrap();
1221                assert!(iter.is_valid());
1222                let mut kv_pairs = (5..10_i32)
1223                    .map(|i| {
1224                        let (k, v) =
1225                            gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices);
1226                        (k, v)
1227                    })
1228                    .collect_vec();
1229                kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1230                let mut index = 0;
1231                while iter.is_valid() {
1232                    assert!(kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
1233                    iter.next().await.unwrap();
1234                    index += 1;
1235                }
1236            }
1237
1238            {
1239                // test watermark
1240                let watermark = {
1241                    let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
1242                    serialize_pk(r1, &watermark_col_serde)
1243                };
1244
1245                let read_watermark = ReadTableWatermark {
1246                    direction: watermark_direction,
1247                    vnode_watermarks: BTreeMap::from_iter(
1248                        (0..2).map(|i| (VirtualNode::from_index(i), watermark.clone())),
1249                    ),
1250                };
1251
1252                let full_key_filter_key_extractor =
1253                    FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
1254
1255                let table_id_to_vnode =
1256                    HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
1257
1258                let table_id_to_watermark_serde = HashMap::from_iter(once((
1259                    TABLE_ID,
1260                    Some((
1261                        pk_serde.clone(),
1262                        watermark_col_serde.clone(),
1263                        watermark_col_idx_in_pk,
1264                    )),
1265                )));
1266
1267                let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1268                    full_key_filter_key_extractor,
1269                    table_id_to_vnode,
1270                    table_id_to_watermark_serde,
1271                    HashMap::default(),
1272                ));
1273
1274                let mut iter = NonPkPrefixSkipWatermarkIterator::new(
1275                    shared_buffer_batch.clone().unwrap().into_forward_iter(),
1276                    NonPkPrefixSkipWatermarkState::new(
1277                        BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
1278                        compaction_catalog_agent_ref,
1279                    ),
1280                );
1281
1282                iter.rewind().await.unwrap();
1283                assert!(iter.is_valid());
1284                let mut kv_pairs = (5..10_i32)
1285                    .map(|i| {
1286                        let (k, v) =
1287                            gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices);
1288                        (k, v)
1289                    })
1290                    .collect_vec();
1291                kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1292                let mut index = 0;
1293                while iter.is_valid() {
1294                    assert!(kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
1295                    iter.next().await.unwrap();
1296                    index += 1;
1297                }
1298            }
1299
1300            {
1301                // test watermark Descending
1302                let watermark = {
1303                    let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
1304                    serialize_pk(r1, &watermark_col_serde)
1305                };
1306
1307                let read_watermark = ReadTableWatermark {
1308                    direction: WatermarkDirection::Descending,
1309                    vnode_watermarks: BTreeMap::from_iter(
1310                        (0..2).map(|i| (VirtualNode::from_index(i), watermark.clone())),
1311                    ),
1312                };
1313
1314                let full_key_filter_key_extractor =
1315                    FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
1316
1317                let table_id_to_vnode =
1318                    HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
1319
1320                let table_id_to_watermark_serde = HashMap::from_iter(once((
1321                    TABLE_ID,
1322                    Some((
1323                        pk_serde.clone(),
1324                        watermark_col_serde.clone(),
1325                        watermark_col_idx_in_pk,
1326                    )),
1327                )));
1328
1329                let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1330                    full_key_filter_key_extractor,
1331                    table_id_to_vnode,
1332                    table_id_to_watermark_serde,
1333                    HashMap::default(),
1334                ));
1335
1336                let mut iter = NonPkPrefixSkipWatermarkIterator::new(
1337                    shared_buffer_batch.clone().unwrap().into_forward_iter(),
1338                    NonPkPrefixSkipWatermarkState::new(
1339                        BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
1340                        compaction_catalog_agent_ref,
1341                    ),
1342                );
1343
1344                iter.rewind().await.unwrap();
1345                assert!(iter.is_valid());
1346                let mut kv_pairs = (0..=5_i32)
1347                    .map(|i| {
1348                        let (k, v) =
1349                            gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices);
1350                        (k, v)
1351                    })
1352                    .collect_vec();
1353                kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1354                let mut index = 0;
1355                while iter.is_valid() {
1356                    assert!(kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
1357                    iter.next().await.unwrap();
1358                    index += 1;
1359                }
1360            }
1361        }
1362    }
1363
1364    #[tokio::test]
1365    async fn test_mix_watermark() {
1366        fn gen_key_value(
1367            vnode: usize,
1368            col_0: i32,
1369            col_1: i32,
1370            col_2: i32,
1371            col_3: i32,
1372            pk_serde: &OrderedRowSerde,
1373            pk_indices: &[usize],
1374        ) -> (TableKey<Bytes>, SharedBufferValue<Bytes>) {
1375            let r = OwnedRow::new(vec![
1376                Some(ScalarImpl::Int32(col_0)),
1377                Some(ScalarImpl::Int32(col_1)),
1378                Some(ScalarImpl::Int32(col_2)), // watermark column
1379                Some(ScalarImpl::Int32(col_3)),
1380            ]);
1381
1382            let pk = r.project(pk_indices);
1383
1384            let k1 = serialize_pk_with_vnode(pk, pk_serde, VirtualNode::from_index(vnode));
1385            let v1 = SharedBufferValue::Insert(Bytes::copy_from_slice(
1386                format!("{}-value-{}-{}-{}-{}", vnode, col_0, col_1, col_2, col_3).as_bytes(),
1387            ));
1388
1389            (k1, v1)
1390        }
1391
1392        let watermark_col_serde =
1393            OrderedRowSerde::new(vec![DataType::Int32], vec![OrderType::ascending()]);
1394        let t1_pk_serde = OrderedRowSerde::new(
1395            vec![DataType::Int32, DataType::Int32, DataType::Int32],
1396            vec![
1397                OrderType::ascending(),
1398                OrderType::ascending(),
1399                OrderType::ascending(),
1400            ],
1401        );
1402
1403        let t1_pk_indices = vec![0, 2, 3];
1404        let t1_watermark_col_idx_in_pk = 1;
1405
1406        let t2_pk_indices = vec![0, 1, 2];
1407
1408        let t2_pk_serde = OrderedRowSerde::new(
1409            vec![DataType::Int32, DataType::Int32, DataType::Int32],
1410            vec![
1411                OrderType::ascending(),
1412                OrderType::ascending(),
1413                OrderType::ascending(),
1414            ],
1415        );
1416
1417        let t1_id = TABLE_ID;
1418        let t2_id = TableId::from(t1_id.as_raw_id() + 1);
1419
1420        let t1_shared_buffer_batch = {
1421            let mut kv_pairs = (0..10_i32)
1422                .map(|i| {
1423                    gen_key_value(
1424                        i as usize % 2,
1425                        10 - i,
1426                        0,
1427                        i,
1428                        i,
1429                        &t1_pk_serde,
1430                        &t1_pk_indices,
1431                    )
1432                })
1433                .collect_vec();
1434
1435            kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1436            build_batch(kv_pairs.into_iter(), t1_id).unwrap()
1437        };
1438
1439        let t2_shared_buffer_batch = {
1440            let mut kv_pairs = (0..10_i32)
1441                .map(|i| {
1442                    gen_key_value(
1443                        i as usize % 2,
1444                        10 - i,
1445                        0,
1446                        0,
1447                        0,
1448                        &t2_pk_serde,
1449                        &t2_pk_indices,
1450                    )
1451                })
1452                .collect_vec();
1453
1454            kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1455            build_batch(kv_pairs.into_iter(), t2_id).unwrap()
1456        };
1457
1458        let t1_watermark = {
1459            let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
1460            serialize_pk(r1, &watermark_col_serde)
1461        };
1462
1463        let t1_read_watermark = ReadTableWatermark {
1464            direction: WatermarkDirection::Ascending,
1465            vnode_watermarks: BTreeMap::from_iter(
1466                (0..2).map(|i| (VirtualNode::from_index(i), t1_watermark.clone())),
1467            ),
1468        };
1469
1470        let t2_watermark = {
1471            let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
1472            serialize_pk(r1, &watermark_col_serde)
1473        };
1474
1475        let t2_read_watermark = ReadTableWatermark {
1476            direction: WatermarkDirection::Ascending,
1477            vnode_watermarks: BTreeMap::from_iter(
1478                (0..2).map(|i| (VirtualNode::from_index(i), t2_watermark.clone())),
1479            ),
1480        };
1481
1482        {
1483            // test non pk prefix watermark
1484            let t1_iter = t1_shared_buffer_batch.clone().into_forward_iter();
1485            let t2_iter = t2_shared_buffer_batch.clone().into_forward_iter();
1486            let iter_vec = vec![t1_iter, t2_iter];
1487            let merge_iter = MergeIterator::new(iter_vec);
1488
1489            let full_key_filter_key_extractor =
1490                FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
1491
1492            let table_id_to_vnode =
1493                HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
1494
1495            let table_id_to_watermark_serde = HashMap::from_iter(once((
1496                t1_id,
1497                Some((
1498                    t1_pk_serde.clone(),
1499                    watermark_col_serde.clone(),
1500                    t1_watermark_col_idx_in_pk,
1501                )),
1502            )));
1503
1504            let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1505                full_key_filter_key_extractor,
1506                table_id_to_vnode,
1507                table_id_to_watermark_serde,
1508                HashMap::default(),
1509            ));
1510
1511            let mut iter = NonPkPrefixSkipWatermarkIterator::new(
1512                merge_iter,
1513                NonPkPrefixSkipWatermarkState::new(
1514                    BTreeMap::from_iter(once((TABLE_ID, t1_read_watermark.clone()))),
1515                    compaction_catalog_agent_ref,
1516                ),
1517            );
1518
1519            iter.rewind().await.unwrap();
1520            assert!(iter.is_valid());
1521            let mut t1_kv_pairs = (5..10_i32)
1522                .map(|i| {
1523                    let (k, v) = gen_key_value(
1524                        i as usize % 2,
1525                        10 - i,
1526                        0,
1527                        i,
1528                        i,
1529                        &t1_pk_serde,
1530                        &t1_pk_indices,
1531                    );
1532                    (k, v)
1533                })
1534                .collect_vec();
1535
1536            t1_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1537
1538            let mut t2_kv_pairs = (0..10_i32)
1539                .map(|i| {
1540                    gen_key_value(
1541                        i as usize % 2,
1542                        10 - i,
1543                        0,
1544                        0,
1545                        0,
1546                        &t2_pk_serde,
1547                        &t2_pk_indices,
1548                    )
1549                })
1550                .collect_vec();
1551
1552            t2_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1553            let mut index = 0;
1554            for _ in 0..t1_kv_pairs.len() {
1555                assert!(t1_kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
1556                iter.next().await.unwrap();
1557                index += 1;
1558            }
1559
1560            assert!(iter.is_valid());
1561            assert_eq!(t1_kv_pairs.len(), index);
1562
1563            index = 0;
1564            for _ in 0..t2_kv_pairs.len() {
1565                assert!(t2_kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
1566                iter.next().await.unwrap();
1567                index += 1;
1568            }
1569
1570            assert!(!iter.is_valid());
1571            assert_eq!(t2_kv_pairs.len(), index);
1572        }
1573
1574        {
1575            let t1_iter = t1_shared_buffer_batch.clone().into_forward_iter();
1576            let t2_iter = t2_shared_buffer_batch.clone().into_forward_iter();
1577            let iter_vec = vec![t1_iter, t2_iter];
1578            let merge_iter = MergeIterator::new(iter_vec);
1579
1580            let full_key_filter_key_extractor =
1581                FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
1582
1583            let table_id_to_vnode = HashMap::from_iter(
1584                vec![
1585                    (t1_id, VirtualNode::COUNT_FOR_TEST),
1586                    (t2_id, VirtualNode::COUNT_FOR_TEST),
1587                ]
1588                .into_iter(),
1589            );
1590
1591            let table_id_to_watermark_serde = HashMap::from_iter(
1592                vec![
1593                    (
1594                        t1_id,
1595                        Some((
1596                            t1_pk_serde.clone(),
1597                            watermark_col_serde.clone(),
1598                            t1_watermark_col_idx_in_pk,
1599                        )),
1600                    ),
1601                    (t2_id, None),
1602                ]
1603                .into_iter(),
1604            );
1605
1606            let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1607                full_key_filter_key_extractor,
1608                table_id_to_vnode,
1609                table_id_to_watermark_serde,
1610                HashMap::default(),
1611            ));
1612
1613            let non_pk_prefix_iter = NonPkPrefixSkipWatermarkIterator::new(
1614                merge_iter,
1615                NonPkPrefixSkipWatermarkState::new(
1616                    BTreeMap::from_iter(once((t1_id, t1_read_watermark.clone()))),
1617                    compaction_catalog_agent_ref.clone(),
1618                ),
1619            );
1620
1621            let mut mix_iter = PkPrefixSkipWatermarkIterator::new(
1622                non_pk_prefix_iter,
1623                PkPrefixSkipWatermarkState::new(BTreeMap::from_iter(once((
1624                    t2_id,
1625                    t2_read_watermark.clone(),
1626                )))),
1627            );
1628
1629            mix_iter.rewind().await.unwrap();
1630            assert!(mix_iter.is_valid());
1631
1632            let mut t1_kv_pairs = (5..10_i32)
1633                .map(|i| {
1634                    let (k, v) = gen_key_value(
1635                        i as usize % 2,
1636                        10 - i,
1637                        0,
1638                        i,
1639                        i,
1640                        &t1_pk_serde,
1641                        &t1_pk_indices,
1642                    );
1643                    (k, v)
1644                })
1645                .collect_vec();
1646
1647            t1_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1648
1649            let mut t2_kv_pairs = (0..=5_i32)
1650                .map(|i| {
1651                    gen_key_value(
1652                        i as usize % 2,
1653                        10 - i,
1654                        0,
1655                        0,
1656                        0,
1657                        &t2_pk_serde,
1658                        &t2_pk_indices,
1659                    )
1660                })
1661                .collect_vec();
1662
1663            t2_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1664
1665            let mut index = 0;
1666            for _ in 0..t1_kv_pairs.len() {
1667                assert!(
1668                    t1_kv_pairs[index].0.as_ref() == mix_iter.key().user_key.table_key.as_ref()
1669                );
1670                mix_iter.next().await.unwrap();
1671                index += 1;
1672            }
1673
1674            assert!(mix_iter.is_valid());
1675            assert_eq!(t1_kv_pairs.len(), index);
1676
1677            index = 0;
1678
1679            for _ in 0..t2_kv_pairs.len() {
1680                assert!(
1681                    t2_kv_pairs[index].0.as_ref() == mix_iter.key().user_key.table_key.as_ref()
1682                );
1683                mix_iter.next().await.unwrap();
1684                index += 1;
1685            }
1686
1687            assert!(!mix_iter.is_valid());
1688            assert_eq!(t2_kv_pairs.len(), index);
1689        }
1690    }
1691}