risingwave_storage/hummock/iterator/
skip_watermark.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::{BTreeMap, VecDeque};
17
18use bytes::Bytes;
19use risingwave_common::catalog::TableId;
20use risingwave_common::hash::VirtualNode;
21use risingwave_common::row::Row;
22use risingwave_common::types::Datum;
23use risingwave_common::util::row_serde::OrderedRowSerde;
24use risingwave_hummock_sdk::compaction_group::hummock_version_ext::safe_epoch_read_table_watermarks_impl;
25use risingwave_hummock_sdk::key::FullKey;
26use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap, add_table_stats_map};
27use risingwave_hummock_sdk::table_watermark::{
28    ReadTableWatermark, TableWatermarks, WatermarkDirection,
29};
30
31use super::SkipWatermarkState;
32use crate::compaction_catalog_manager::CompactionCatalogAgentRef;
33use crate::hummock::HummockResult;
34use crate::hummock::iterator::{Forward, HummockIterator, ValueMeta};
35use crate::hummock::value::HummockValue;
36use crate::monitor::StoreLocalStatistic;
37
38pub struct SkipWatermarkIterator<I, S> {
39    inner: I,
40    state: S,
41    /// The stats of skipped key-value pairs for each table.
42    skipped_entry_table_stats: TableStatsMap,
43    /// The id of table currently undergoing processing.
44    last_table_id: Option<TableId>,
45    /// The stats of table currently undergoing processing.
46    last_table_stats: TableStats,
47}
48
49impl<I: HummockIterator<Direction = Forward>, S: SkipWatermarkState> SkipWatermarkIterator<I, S> {
50    pub fn new(inner: I, state: S) -> Self {
51        Self {
52            inner,
53            state,
54            skipped_entry_table_stats: TableStatsMap::default(),
55            last_table_id: None,
56            last_table_stats: TableStats::default(),
57        }
58    }
59
60    fn reset_watermark(&mut self) {
61        self.state.reset_watermark();
62    }
63
64    fn reset_skipped_entry_table_stats(&mut self) {
65        self.skipped_entry_table_stats = TableStatsMap::default();
66        self.last_table_id = None;
67        self.last_table_stats = TableStats::default();
68    }
69
70    /// Advance the key until iterator invalid or the current key will not be filtered by the latest watermark.
71    /// Calling this method should ensure that the first remaining watermark has been advanced to the current key.
72    ///
73    /// Return a flag indicating whether should later advance the watermark.
74    async fn advance_key_and_watermark(&mut self) -> HummockResult<()> {
75        // advance key and watermark in an interleave manner until nothing
76        // changed after the method is called.
77        while self.inner.is_valid() {
78            if !self.state.should_delete(&self.inner.key()) {
79                break;
80            }
81
82            if self
83                .last_table_id
84                .is_none_or(|last_table_id| last_table_id != self.inner.key().user_key.table_id)
85            {
86                self.add_last_table_stats();
87                self.last_table_id = Some(self.inner.key().user_key.table_id);
88            }
89            self.last_table_stats.total_key_count -= 1;
90            self.last_table_stats.total_key_size -= self.inner.key().encoded_len() as i64;
91            self.last_table_stats.total_value_size -= self.inner.value().encoded_len() as i64;
92
93            self.inner.next().await?;
94        }
95        self.add_last_table_stats();
96        Ok(())
97    }
98
99    fn add_last_table_stats(&mut self) {
100        let Some(last_table_id) = self.last_table_id.take() else {
101            return;
102        };
103        let delta = std::mem::take(&mut self.last_table_stats);
104        let e = self
105            .skipped_entry_table_stats
106            .entry(last_table_id)
107            .or_default();
108        e.total_key_count += delta.total_key_count;
109        e.total_key_size += delta.total_key_size;
110        e.total_value_size += delta.total_value_size;
111    }
112}
113
114impl<I: HummockIterator<Direction = Forward>, S: SkipWatermarkState> HummockIterator
115    for SkipWatermarkIterator<I, S>
116{
117    type Direction = Forward;
118
119    async fn next(&mut self) -> HummockResult<()> {
120        self.inner.next().await?;
121        // Check whether there is any remaining watermark and return early to
122        // avoid calling the async `advance_key_and_watermark`, since in benchmark
123        // performance downgrade is observed without this early return.
124        if self.state.has_watermark() {
125            self.advance_key_and_watermark().await?;
126        }
127        Ok(())
128    }
129
130    fn key(&self) -> FullKey<&[u8]> {
131        self.inner.key()
132    }
133
134    fn value(&self) -> HummockValue<&[u8]> {
135        self.inner.value()
136    }
137
138    fn is_valid(&self) -> bool {
139        self.inner.is_valid()
140    }
141
142    async fn rewind(&mut self) -> HummockResult<()> {
143        self.reset_watermark();
144        self.reset_skipped_entry_table_stats();
145        self.inner.rewind().await?;
146        self.advance_key_and_watermark().await?;
147        Ok(())
148    }
149
150    async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
151        self.reset_watermark();
152        self.reset_skipped_entry_table_stats();
153        self.inner.seek(key).await?;
154        self.advance_key_and_watermark().await?;
155        Ok(())
156    }
157
158    fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
159        add_table_stats_map(
160            &mut stats.skipped_by_watermark_table_stats,
161            &self.skipped_entry_table_stats,
162        );
163        self.inner.collect_local_statistic(stats)
164    }
165
166    fn value_meta(&self) -> ValueMeta {
167        self.inner.value_meta()
168    }
169}
170pub struct PkPrefixSkipWatermarkState {
171    watermarks: BTreeMap<TableId, ReadTableWatermark>,
172    remain_watermarks: VecDeque<(TableId, VirtualNode, WatermarkDirection, Bytes)>,
173}
174
175impl SkipWatermarkState for PkPrefixSkipWatermarkState {
176    #[inline(always)]
177    fn has_watermark(&self) -> bool {
178        !self.remain_watermarks.is_empty()
179    }
180
181    fn should_delete(&mut self, key: &FullKey<&[u8]>) -> bool {
182        if let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
183            let key_table_id = key.user_key.table_id;
184            let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
185            match (&key_table_id, &key_vnode).cmp(&(table_id, vnode)) {
186                Ordering::Less => {
187                    return false;
188                }
189                Ordering::Equal => {
190                    return direction.key_filter_by_watermark(inner_key, watermark);
191                }
192                Ordering::Greater => {
193                    // The current key has advanced over the watermark.
194                    // We may advance the watermark before advancing the key.
195                    return self.advance_watermark(key);
196                }
197            }
198        }
199        false
200    }
201
202    fn reset_watermark(&mut self) {
203        self.remain_watermarks = self
204            .watermarks
205            .iter()
206            .flat_map(|(table_id, read_watermarks)| {
207                read_watermarks
208                    .vnode_watermarks
209                    .iter()
210                    .map(|(vnode, watermarks)| {
211                        (
212                            *table_id,
213                            *vnode,
214                            read_watermarks.direction,
215                            watermarks.clone(),
216                        )
217                    })
218            })
219            .collect();
220    }
221
222    /// Advance watermark until no watermark remains or the first watermark can possibly
223    /// filter out the current or future key.
224    ///
225    /// Return a flag indicating whether the current key will be filtered by the current watermark.
226    fn advance_watermark(&mut self, key: &FullKey<&[u8]>) -> bool {
227        let key_table_id = key.user_key.table_id;
228        let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
229        while let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
230            match (table_id, vnode).cmp(&(&key_table_id, &key_vnode)) {
231                Ordering::Less => {
232                    self.remain_watermarks.pop_front();
233                    continue;
234                }
235                Ordering::Equal => {
236                    match direction {
237                        WatermarkDirection::Ascending => {
238                            match inner_key.cmp(watermark.as_ref()) {
239                                Ordering::Less => {
240                                    // The current key will be filtered by the watermark.
241                                    // Return true to further advance the key.
242                                    return true;
243                                }
244                                Ordering::Equal | Ordering::Greater => {
245                                    // The current key has passed the watermark.
246                                    // Advance the next watermark.
247                                    self.remain_watermarks.pop_front();
248                                    // Since it is impossible for a (table_id, vnode) tuple to have multiple
249                                    // watermark, after the pop_front, the next (table_id, vnode) must have
250                                    // exceeded the current key, and we can directly return and mark that the
251                                    // current key is not filtered by the watermark at the front.
252                                    #[cfg(debug_assertions)]
253                                    {
254                                        if let Some((next_table_id, next_vnode, _, _)) =
255                                            self.remain_watermarks.front()
256                                        {
257                                            assert!(
258                                                (next_table_id, next_vnode)
259                                                    > (&key_table_id, &key_vnode)
260                                            );
261                                        }
262                                    }
263                                    return false;
264                                }
265                            }
266                        }
267                        WatermarkDirection::Descending => {
268                            return match inner_key.cmp(watermark.as_ref()) {
269                                // Current key as not reached the watermark. Just return.
270                                Ordering::Less | Ordering::Equal => false,
271                                // Current key will be filtered by the watermark.
272                                // Return true to further advance the key.
273                                Ordering::Greater => true,
274                            };
275                        }
276                    }
277                }
278                Ordering::Greater => {
279                    return false;
280                }
281            }
282        }
283        false
284    }
285}
286
287impl PkPrefixSkipWatermarkState {
288    pub fn new(watermarks: BTreeMap<TableId, ReadTableWatermark>) -> Self {
289        Self {
290            remain_watermarks: VecDeque::new(),
291            watermarks,
292        }
293    }
294
295    pub fn from_safe_epoch_watermarks(
296        safe_epoch_watermarks: BTreeMap<TableId, TableWatermarks>,
297    ) -> Self {
298        let watermarks = safe_epoch_read_table_watermarks_impl(safe_epoch_watermarks);
299        Self::new(watermarks)
300    }
301}
302
303pub struct NonPkPrefixSkipWatermarkState {
304    watermarks: BTreeMap<TableId, ReadTableWatermark>,
305    remain_watermarks: VecDeque<(TableId, VirtualNode, WatermarkDirection, Datum)>,
306    compaction_catalog_agent_ref: CompactionCatalogAgentRef,
307
308    last_serde: Option<(OrderedRowSerde, OrderedRowSerde, usize)>,
309    last_table_id: Option<TableId>,
310}
311
312impl NonPkPrefixSkipWatermarkState {
313    pub fn new(
314        watermarks: BTreeMap<TableId, ReadTableWatermark>,
315        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
316    ) -> Self {
317        Self {
318            remain_watermarks: VecDeque::new(),
319            watermarks,
320            compaction_catalog_agent_ref,
321            last_serde: None,
322            last_table_id: None,
323        }
324    }
325
326    pub fn from_safe_epoch_watermarks(
327        safe_epoch_watermarks: BTreeMap<TableId, TableWatermarks>,
328        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
329    ) -> Self {
330        let watermarks = safe_epoch_read_table_watermarks_impl(safe_epoch_watermarks);
331        Self::new(watermarks, compaction_catalog_agent_ref)
332    }
333}
334
335impl SkipWatermarkState for NonPkPrefixSkipWatermarkState {
336    #[inline(always)]
337    fn has_watermark(&self) -> bool {
338        !self.remain_watermarks.is_empty()
339    }
340
341    fn should_delete(&mut self, key: &FullKey<&[u8]>) -> bool {
342        if let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
343            let key_table_id = key.user_key.table_id;
344            {
345                if self
346                    .last_table_id
347                    .is_none_or(|last_table_id| last_table_id != key_table_id)
348                {
349                    self.last_table_id = Some(key_table_id);
350                    self.last_serde = self.compaction_catalog_agent_ref.watermark_serde(*table_id);
351                }
352            }
353
354            let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
355            match (&key_table_id, &key_vnode).cmp(&(table_id, vnode)) {
356                Ordering::Less => {
357                    return false;
358                }
359                Ordering::Equal => {
360                    let (pk_prefix_serde, watermark_col_serde, watermark_col_idx_in_pk) =
361                        self.last_serde.as_ref().unwrap();
362                    let row = pk_prefix_serde
363                        .deserialize(inner_key)
364                        .unwrap_or_else(|_| {
365                            panic!("Failed to deserialize pk_prefix inner_key {:?} serde data_types {:?} order_types {:?}", inner_key, pk_prefix_serde.get_data_types(), pk_prefix_serde.get_order_types());
366                        });
367                    let watermark_col_in_pk = row.datum_at(*watermark_col_idx_in_pk);
368                    return direction.datum_filter_by_watermark(
369                        watermark_col_in_pk,
370                        watermark,
371                        watermark_col_serde.get_order_types()[0],
372                    );
373                }
374                Ordering::Greater => {
375                    // The current key has advanced over the watermark.
376                    // We may advance the watermark before advancing the key.
377                    return self.advance_watermark(key);
378                }
379            }
380        }
381        false
382    }
383
384    fn reset_watermark(&mut self) {
385        self.remain_watermarks = self
386            .watermarks
387            .iter()
388            .flat_map(|(table_id, read_watermarks)| {
389                let watermark_serde = self.compaction_catalog_agent_ref.watermark_serde(*table_id).map(|(_pk_serde, watermark_serde, _watermark_col_idx_in_pk)| watermark_serde);
390
391                read_watermarks
392                    .vnode_watermarks
393                    .iter()
394                    .map(move |(vnode, watermarks)| {
395                        (
396                            *table_id,
397                            *vnode,
398                            read_watermarks.direction,
399                            {
400                                let watermark_serde = watermark_serde.as_ref().unwrap();
401                                let row = watermark_serde
402                                .deserialize(watermarks).unwrap_or_else(|_| {
403                                    panic!("Failed to deserialize watermark {:?} serde data_types {:?} order_types {:?}", watermarks, watermark_serde.get_data_types(), watermark_serde.get_order_types());
404                                });
405                                row[0].clone()
406                            },
407                        )
408                    })
409            })
410            .collect();
411    }
412
413    fn advance_watermark(&mut self, key: &FullKey<&[u8]>) -> bool {
414        let key_table_id = key.user_key.table_id;
415        let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
416        while let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
417            match (table_id, vnode).cmp(&(&key_table_id, &key_vnode)) {
418                Ordering::Less => {
419                    self.remain_watermarks.pop_front();
420                    continue;
421                }
422                Ordering::Equal => {
423                    let (pk_prefix_serde, watermark_col_serde, watermark_col_idx_in_pk) =
424                        self.last_serde.as_ref().unwrap();
425
426                    let row = pk_prefix_serde
427                        .deserialize(inner_key)
428                        .unwrap_or_else(|_| {
429                            panic!("Failed to deserialize pk_prefix inner_key {:?} serde data_types {:?} order_types {:?}", inner_key, pk_prefix_serde.get_data_types(), pk_prefix_serde.get_order_types());
430                        });
431                    let watermark_col_in_pk = row.datum_at(*watermark_col_idx_in_pk);
432
433                    return direction.datum_filter_by_watermark(
434                        watermark_col_in_pk,
435                        watermark,
436                        watermark_col_serde.get_order_types()[0],
437                    );
438                }
439                Ordering::Greater => {
440                    return false;
441                }
442            }
443        }
444        false
445    }
446}
447
448pub type PkPrefixSkipWatermarkIterator<I> = SkipWatermarkIterator<I, PkPrefixSkipWatermarkState>;
449
450pub type NonPkPrefixSkipWatermarkIterator<I> =
451    SkipWatermarkIterator<I, NonPkPrefixSkipWatermarkState>;
452
453#[cfg(test)]
454mod tests {
455    use std::collections::{BTreeMap, HashMap};
456    use std::iter::{empty, once};
457    use std::sync::Arc;
458
459    use bytes::Bytes;
460    use itertools::Itertools;
461    use risingwave_common::catalog::TableId;
462    use risingwave_common::hash::VirtualNode;
463    use risingwave_common::row::{OwnedRow, RowExt};
464    use risingwave_common::types::{DataType, ScalarImpl};
465    use risingwave_common::util::epoch::test_epoch;
466    use risingwave_common::util::row_serde::OrderedRowSerde;
467    use risingwave_common::util::sort_util::OrderType;
468    use risingwave_hummock_sdk::EpochWithGap;
469    use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey, gen_key_from_str};
470    use risingwave_hummock_sdk::table_watermark::{ReadTableWatermark, WatermarkDirection};
471
472    use super::PkPrefixSkipWatermarkState;
473    use crate::compaction_catalog_manager::{
474        CompactionCatalogAgent, FilterKeyExtractorImpl, FullKeyFilterKeyExtractor,
475    };
476    use crate::hummock::iterator::{
477        HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
478        NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator,
479    };
480    use crate::hummock::shared_buffer::shared_buffer_batch::{
481        SharedBufferBatch, SharedBufferValue,
482    };
483    use crate::row_serde::row_serde_util::{serialize_pk, serialize_pk_with_vnode};
484
485    const EPOCH: u64 = test_epoch(1);
486    const TABLE_ID: TableId = TableId::new(233);
487
488    async fn assert_iter_eq(
489        mut first: Option<impl HummockIterator>,
490        mut second: impl HummockIterator,
491        seek_key: Option<(usize, usize)>,
492    ) {
493        if let Some((vnode, key_index)) = seek_key {
494            let (seek_key, _) = gen_key_value(vnode, key_index);
495            let full_key = FullKey {
496                user_key: UserKey {
497                    table_id: TABLE_ID,
498                    table_key: seek_key,
499                },
500                epoch_with_gap: EpochWithGap::new_from_epoch(EPOCH),
501            };
502            if let Some(first) = &mut first {
503                first.seek(full_key.to_ref()).await.unwrap();
504            }
505            second.seek(full_key.to_ref()).await.unwrap()
506        } else {
507            if let Some(first) = &mut first {
508                first.rewind().await.unwrap();
509            }
510            second.rewind().await.unwrap();
511        }
512
513        if let Some(first) = &mut first {
514            while first.is_valid() {
515                assert!(second.is_valid());
516                let first_key = first.key();
517                let second_key = second.key();
518                assert_eq!(first_key, second_key);
519                assert_eq!(first.value(), second.value());
520                first.next().await.unwrap();
521                second.next().await.unwrap();
522            }
523        }
524        assert!(!second.is_valid());
525    }
526
527    fn build_batch(
528        pairs: impl Iterator<Item = (TableKey<Bytes>, SharedBufferValue<Bytes>)>,
529        table_id: TableId,
530    ) -> Option<SharedBufferBatch> {
531        let pairs: Vec<_> = pairs.collect();
532        if pairs.is_empty() {
533            None
534        } else {
535            Some(SharedBufferBatch::for_test(pairs, EPOCH, table_id))
536        }
537    }
538
539    fn filter_with_watermarks(
540        iter: impl Iterator<Item = (TableKey<Bytes>, SharedBufferValue<Bytes>)>,
541        table_watermarks: ReadTableWatermark,
542    ) -> impl Iterator<Item = (TableKey<Bytes>, SharedBufferValue<Bytes>)> {
543        iter.filter(move |(key, _)| {
544            if let Some(watermark) = table_watermarks.vnode_watermarks.get(&key.vnode_part()) {
545                !table_watermarks
546                    .direction
547                    .key_filter_by_watermark(key.key_part(), watermark)
548            } else {
549                true
550            }
551        })
552    }
553
554    fn gen_inner_key(index: usize) -> String {
555        format!("key{:5}", index)
556    }
557
558    async fn test_watermark(
559        watermarks: impl IntoIterator<Item = (usize, usize)>,
560        direction: WatermarkDirection,
561    ) {
562        let test_index: [(usize, usize); 7] =
563            [(0, 2), (0, 3), (0, 4), (1, 1), (1, 3), (4, 2), (8, 1)];
564        let items = test_index
565            .iter()
566            .map(|(vnode, key_index)| gen_key_value(*vnode, *key_index))
567            .collect_vec();
568
569        let read_watermark = ReadTableWatermark {
570            direction,
571            vnode_watermarks: BTreeMap::from_iter(watermarks.into_iter().map(
572                |(vnode, key_index)| {
573                    (
574                        VirtualNode::from_index(vnode),
575                        Bytes::from(gen_inner_key(key_index)),
576                    )
577                },
578            )),
579        };
580
581        let gen_iters = || {
582            let batch = build_batch(
583                filter_with_watermarks(items.clone().into_iter(), read_watermark.clone()),
584                TABLE_ID,
585            );
586
587            let iter = PkPrefixSkipWatermarkIterator::new(
588                build_batch(items.clone().into_iter(), TABLE_ID)
589                    .unwrap()
590                    .into_forward_iter(),
591                PkPrefixSkipWatermarkState::new(BTreeMap::from_iter(once((
592                    TABLE_ID,
593                    read_watermark.clone(),
594                )))),
595            );
596            (batch.map(|batch| batch.into_forward_iter()), iter)
597        };
598        let (first, second) = gen_iters();
599        assert_iter_eq(first, second, None).await;
600        for (vnode, key_index) in &test_index {
601            let (first, second) = gen_iters();
602            assert_iter_eq(first, second, Some((*vnode, *key_index))).await;
603        }
604        let (last_vnode, last_key_index) = test_index.last().unwrap();
605        let (first, second) = gen_iters();
606        assert_iter_eq(first, second, Some((*last_vnode, last_key_index + 1))).await;
607    }
608
609    fn gen_key_value(vnode: usize, index: usize) -> (TableKey<Bytes>, SharedBufferValue<Bytes>) {
610        (
611            gen_key_from_str(VirtualNode::from_index(vnode), &gen_inner_key(index)),
612            SharedBufferValue::Insert(Bytes::copy_from_slice(
613                format!("{}-value-{}", vnode, index).as_bytes(),
614            )),
615        )
616    }
617
618    #[tokio::test]
619    async fn test_no_watermark() {
620        test_watermark(empty(), WatermarkDirection::Ascending).await;
621        test_watermark(empty(), WatermarkDirection::Descending).await;
622    }
623
624    #[tokio::test]
625    async fn test_too_low_watermark() {
626        test_watermark(vec![(0, 0)], WatermarkDirection::Ascending).await;
627        test_watermark(vec![(0, 10)], WatermarkDirection::Descending).await;
628    }
629
630    #[tokio::test]
631    async fn test_single_watermark() {
632        test_watermark(vec![(0, 3)], WatermarkDirection::Ascending).await;
633        test_watermark(vec![(0, 3)], WatermarkDirection::Descending).await;
634    }
635
636    #[tokio::test]
637    async fn test_watermark_vnode_no_data() {
638        test_watermark(vec![(3, 3)], WatermarkDirection::Ascending).await;
639        test_watermark(vec![(3, 3)], WatermarkDirection::Descending).await;
640    }
641
642    #[tokio::test]
643    async fn test_filter_all() {
644        test_watermark(
645            vec![(0, 5), (1, 4), (2, 0), (4, 3), (8, 2)],
646            WatermarkDirection::Ascending,
647        )
648        .await;
649        test_watermark(
650            vec![(0, 0), (1, 0), (2, 0), (4, 0), (8, 0)],
651            WatermarkDirection::Descending,
652        )
653        .await;
654    }
655
656    #[tokio::test]
657    async fn test_advance_multi_vnode() {
658        test_watermark(vec![(1, 2), (8, 0)], WatermarkDirection::Ascending).await;
659    }
660
661    #[tokio::test]
662    async fn test_non_pk_prefix_watermark() {
663        fn gen_key_value(
664            vnode: usize,
665            col_0: i32,
666            col_1: i32,
667            col_2: i32,
668            col_3: i32,
669            pk_serde: &OrderedRowSerde,
670            pk_indices: &[usize],
671        ) -> (TableKey<Bytes>, SharedBufferValue<Bytes>) {
672            let r = OwnedRow::new(vec![
673                Some(ScalarImpl::Int32(col_0)),
674                Some(ScalarImpl::Int32(col_1)),
675                Some(ScalarImpl::Int32(col_2)), // watermark column
676                Some(ScalarImpl::Int32(col_3)),
677            ]);
678
679            let pk = r.project(pk_indices);
680
681            let k1 = serialize_pk_with_vnode(pk, pk_serde, VirtualNode::from_index(vnode));
682            let v1 = SharedBufferValue::Insert(Bytes::copy_from_slice(
683                format!("{}-value-{}", vnode, col_2).as_bytes(),
684            ));
685            (k1, v1)
686        }
687
688        let watermark_direction = WatermarkDirection::Ascending;
689
690        let watermark_col_serde =
691            OrderedRowSerde::new(vec![DataType::Int32], vec![OrderType::ascending()]);
692        let pk_serde = OrderedRowSerde::new(
693            vec![DataType::Int32, DataType::Int32, DataType::Int32],
694            vec![
695                OrderType::ascending(),
696                OrderType::ascending(),
697                OrderType::ascending(),
698            ],
699        );
700
701        let pk_indices = vec![0, 2, 3];
702        let watermark_col_idx_in_pk = 1;
703
704        {
705            // test single vnode
706            let shared_buffer_batch = {
707                let mut kv_pairs = (0..10)
708                    .map(|i| gen_key_value(0, i, 0, i, i, &pk_serde, &pk_indices))
709                    .collect_vec();
710                kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
711                build_batch(kv_pairs.into_iter(), TABLE_ID)
712            }
713            .unwrap();
714
715            {
716                // empty read watermark
717                let read_watermark = ReadTableWatermark {
718                    direction: watermark_direction,
719                    vnode_watermarks: BTreeMap::default(),
720                };
721
722                let compaction_catalog_agent_ref = CompactionCatalogAgent::for_test(vec![TABLE_ID]);
723
724                let mut iter = NonPkPrefixSkipWatermarkIterator::new(
725                    shared_buffer_batch.clone().into_forward_iter(),
726                    NonPkPrefixSkipWatermarkState::new(
727                        BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
728                        compaction_catalog_agent_ref,
729                    ),
730                );
731
732                iter.rewind().await.unwrap();
733                assert!(iter.is_valid());
734                for i in 0..10 {
735                    let (k, _v) = gen_key_value(0, i, 0, i, i, &pk_serde, &pk_indices);
736                    assert_eq!(iter.key().user_key.table_key.as_ref(), k.as_ref());
737                    iter.next().await.unwrap();
738                }
739                assert!(!iter.is_valid());
740            }
741
742            {
743                // test watermark
744                let watermark = {
745                    let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
746                    serialize_pk(r1, &watermark_col_serde)
747                };
748
749                let read_watermark = ReadTableWatermark {
750                    direction: watermark_direction,
751                    vnode_watermarks: BTreeMap::from_iter(once((
752                        VirtualNode::from_index(0),
753                        watermark.clone(),
754                    ))),
755                };
756
757                let full_key_filter_key_extractor =
758                    FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
759
760                let table_id_to_vnode =
761                    HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
762
763                let table_id_to_watermark_serde = HashMap::from_iter(once((
764                    TABLE_ID,
765                    Some((
766                        pk_serde.clone(),
767                        watermark_col_serde.clone(),
768                        watermark_col_idx_in_pk,
769                    )),
770                )));
771
772                let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
773                    full_key_filter_key_extractor,
774                    table_id_to_vnode,
775                    table_id_to_watermark_serde,
776                ));
777
778                let mut iter = NonPkPrefixSkipWatermarkIterator::new(
779                    shared_buffer_batch.clone().into_forward_iter(),
780                    NonPkPrefixSkipWatermarkState::new(
781                        BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
782                        compaction_catalog_agent_ref,
783                    ),
784                );
785
786                iter.rewind().await.unwrap();
787                assert!(iter.is_valid());
788                for i in 5..10 {
789                    let (k, _v) = gen_key_value(0, i, 0, i, i, &pk_serde, &pk_indices);
790                    assert_eq!(iter.key().user_key.table_key.as_ref(), k.as_ref());
791                    iter.next().await.unwrap();
792                }
793                assert!(!iter.is_valid());
794            }
795        }
796
797        {
798            // test multi vnode
799            let shared_buffer_batch = {
800                let mut kv_pairs = (0..10_i32)
801                    .map(|i| gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices))
802                    .collect_vec();
803
804                kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
805                build_batch(kv_pairs.into_iter(), TABLE_ID)
806            };
807
808            {
809                // test watermark
810                let watermark = {
811                    let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
812                    serialize_pk(r1, &watermark_col_serde)
813                };
814
815                let read_watermark = ReadTableWatermark {
816                    direction: watermark_direction,
817                    vnode_watermarks: BTreeMap::from_iter(
818                        (0..2).map(|i| (VirtualNode::from_index(i), watermark.clone())),
819                    ),
820                };
821
822                let full_key_filter_key_extractor =
823                    FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
824
825                let table_id_to_vnode =
826                    HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
827
828                let table_id_to_watermark_serde = HashMap::from_iter(once((
829                    TABLE_ID,
830                    Some((
831                        pk_serde.clone(),
832                        watermark_col_serde.clone(),
833                        watermark_col_idx_in_pk,
834                    )),
835                )));
836
837                let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
838                    full_key_filter_key_extractor,
839                    table_id_to_vnode,
840                    table_id_to_watermark_serde,
841                ));
842
843                let mut iter = NonPkPrefixSkipWatermarkIterator::new(
844                    shared_buffer_batch.clone().unwrap().into_forward_iter(),
845                    NonPkPrefixSkipWatermarkState::new(
846                        BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
847                        compaction_catalog_agent_ref,
848                    ),
849                );
850
851                iter.rewind().await.unwrap();
852                assert!(iter.is_valid());
853                let mut kv_pairs = (5..10_i32)
854                    .map(|i| {
855                        let (k, v) =
856                            gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices);
857                        (k, v)
858                    })
859                    .collect_vec();
860                kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
861                let mut index = 0;
862                while iter.is_valid() {
863                    assert!(kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
864                    iter.next().await.unwrap();
865                    index += 1;
866                }
867            }
868
869            {
870                // test watermark
871                let watermark = {
872                    let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
873                    serialize_pk(r1, &watermark_col_serde)
874                };
875
876                let read_watermark = ReadTableWatermark {
877                    direction: watermark_direction,
878                    vnode_watermarks: BTreeMap::from_iter(
879                        (0..2).map(|i| (VirtualNode::from_index(i), watermark.clone())),
880                    ),
881                };
882
883                let full_key_filter_key_extractor =
884                    FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
885
886                let table_id_to_vnode =
887                    HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
888
889                let table_id_to_watermark_serde = HashMap::from_iter(once((
890                    TABLE_ID,
891                    Some((
892                        pk_serde.clone(),
893                        watermark_col_serde.clone(),
894                        watermark_col_idx_in_pk,
895                    )),
896                )));
897
898                let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
899                    full_key_filter_key_extractor,
900                    table_id_to_vnode,
901                    table_id_to_watermark_serde,
902                ));
903
904                let mut iter = NonPkPrefixSkipWatermarkIterator::new(
905                    shared_buffer_batch.clone().unwrap().into_forward_iter(),
906                    NonPkPrefixSkipWatermarkState::new(
907                        BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
908                        compaction_catalog_agent_ref,
909                    ),
910                );
911
912                iter.rewind().await.unwrap();
913                assert!(iter.is_valid());
914                let mut kv_pairs = (5..10_i32)
915                    .map(|i| {
916                        let (k, v) =
917                            gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices);
918                        (k, v)
919                    })
920                    .collect_vec();
921                kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
922                let mut index = 0;
923                while iter.is_valid() {
924                    assert!(kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
925                    iter.next().await.unwrap();
926                    index += 1;
927                }
928            }
929
930            {
931                // test watermark Descending
932                let watermark = {
933                    let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
934                    serialize_pk(r1, &watermark_col_serde)
935                };
936
937                let read_watermark = ReadTableWatermark {
938                    direction: WatermarkDirection::Descending,
939                    vnode_watermarks: BTreeMap::from_iter(
940                        (0..2).map(|i| (VirtualNode::from_index(i), watermark.clone())),
941                    ),
942                };
943
944                let full_key_filter_key_extractor =
945                    FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
946
947                let table_id_to_vnode =
948                    HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
949
950                let table_id_to_watermark_serde = HashMap::from_iter(once((
951                    TABLE_ID,
952                    Some((
953                        pk_serde.clone(),
954                        watermark_col_serde.clone(),
955                        watermark_col_idx_in_pk,
956                    )),
957                )));
958
959                let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
960                    full_key_filter_key_extractor,
961                    table_id_to_vnode,
962                    table_id_to_watermark_serde,
963                ));
964
965                let mut iter = NonPkPrefixSkipWatermarkIterator::new(
966                    shared_buffer_batch.clone().unwrap().into_forward_iter(),
967                    NonPkPrefixSkipWatermarkState::new(
968                        BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
969                        compaction_catalog_agent_ref,
970                    ),
971                );
972
973                iter.rewind().await.unwrap();
974                assert!(iter.is_valid());
975                let mut kv_pairs = (0..=5_i32)
976                    .map(|i| {
977                        let (k, v) =
978                            gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices);
979                        (k, v)
980                    })
981                    .collect_vec();
982                kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
983                let mut index = 0;
984                while iter.is_valid() {
985                    assert!(kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
986                    iter.next().await.unwrap();
987                    index += 1;
988                }
989            }
990        }
991    }
992
993    #[tokio::test]
994    async fn test_mix_watermark() {
995        fn gen_key_value(
996            vnode: usize,
997            col_0: i32,
998            col_1: i32,
999            col_2: i32,
1000            col_3: i32,
1001            pk_serde: &OrderedRowSerde,
1002            pk_indices: &[usize],
1003        ) -> (TableKey<Bytes>, SharedBufferValue<Bytes>) {
1004            let r = OwnedRow::new(vec![
1005                Some(ScalarImpl::Int32(col_0)),
1006                Some(ScalarImpl::Int32(col_1)),
1007                Some(ScalarImpl::Int32(col_2)), // watermark column
1008                Some(ScalarImpl::Int32(col_3)),
1009            ]);
1010
1011            let pk = r.project(pk_indices);
1012
1013            let k1 = serialize_pk_with_vnode(pk, pk_serde, VirtualNode::from_index(vnode));
1014            let v1 = SharedBufferValue::Insert(Bytes::copy_from_slice(
1015                format!("{}-value-{}-{}-{}-{}", vnode, col_0, col_1, col_2, col_3).as_bytes(),
1016            ));
1017
1018            (k1, v1)
1019        }
1020
1021        let watermark_col_serde =
1022            OrderedRowSerde::new(vec![DataType::Int32], vec![OrderType::ascending()]);
1023        let t1_pk_serde = OrderedRowSerde::new(
1024            vec![DataType::Int32, DataType::Int32, DataType::Int32],
1025            vec![
1026                OrderType::ascending(),
1027                OrderType::ascending(),
1028                OrderType::ascending(),
1029            ],
1030        );
1031
1032        let t1_pk_indices = vec![0, 2, 3];
1033        let t1_watermark_col_idx_in_pk = 1;
1034
1035        let t2_pk_indices = vec![0, 1, 2];
1036
1037        let t2_pk_serde = OrderedRowSerde::new(
1038            vec![DataType::Int32, DataType::Int32, DataType::Int32],
1039            vec![
1040                OrderType::ascending(),
1041                OrderType::ascending(),
1042                OrderType::ascending(),
1043            ],
1044        );
1045
1046        let t1_id = TABLE_ID;
1047        let t2_id = TableId::from(t1_id.as_raw_id() + 1);
1048
1049        let t1_shared_buffer_batch = {
1050            let mut kv_pairs = (0..10_i32)
1051                .map(|i| {
1052                    gen_key_value(
1053                        i as usize % 2,
1054                        10 - i,
1055                        0,
1056                        i,
1057                        i,
1058                        &t1_pk_serde,
1059                        &t1_pk_indices,
1060                    )
1061                })
1062                .collect_vec();
1063
1064            kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1065            build_batch(kv_pairs.into_iter(), t1_id).unwrap()
1066        };
1067
1068        let t2_shared_buffer_batch = {
1069            let mut kv_pairs = (0..10_i32)
1070                .map(|i| {
1071                    gen_key_value(
1072                        i as usize % 2,
1073                        10 - i,
1074                        0,
1075                        0,
1076                        0,
1077                        &t2_pk_serde,
1078                        &t2_pk_indices,
1079                    )
1080                })
1081                .collect_vec();
1082
1083            kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1084            build_batch(kv_pairs.into_iter(), t2_id).unwrap()
1085        };
1086
1087        let t1_watermark = {
1088            let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
1089            serialize_pk(r1, &watermark_col_serde)
1090        };
1091
1092        let t1_read_watermark = ReadTableWatermark {
1093            direction: WatermarkDirection::Ascending,
1094            vnode_watermarks: BTreeMap::from_iter(
1095                (0..2).map(|i| (VirtualNode::from_index(i), t1_watermark.clone())),
1096            ),
1097        };
1098
1099        let t2_watermark = {
1100            let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
1101            serialize_pk(r1, &watermark_col_serde)
1102        };
1103
1104        let t2_read_watermark = ReadTableWatermark {
1105            direction: WatermarkDirection::Ascending,
1106            vnode_watermarks: BTreeMap::from_iter(
1107                (0..2).map(|i| (VirtualNode::from_index(i), t2_watermark.clone())),
1108            ),
1109        };
1110
1111        {
1112            // test non pk prefix watermark
1113            let t1_iter = t1_shared_buffer_batch.clone().into_forward_iter();
1114            let t2_iter = t2_shared_buffer_batch.clone().into_forward_iter();
1115            let iter_vec = vec![t1_iter, t2_iter];
1116            let merge_iter = MergeIterator::new(iter_vec);
1117
1118            let full_key_filter_key_extractor =
1119                FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
1120
1121            let table_id_to_vnode =
1122                HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
1123
1124            let table_id_to_watermark_serde = HashMap::from_iter(once((
1125                t1_id,
1126                Some((
1127                    t1_pk_serde.clone(),
1128                    watermark_col_serde.clone(),
1129                    t1_watermark_col_idx_in_pk,
1130                )),
1131            )));
1132
1133            let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1134                full_key_filter_key_extractor,
1135                table_id_to_vnode,
1136                table_id_to_watermark_serde,
1137            ));
1138
1139            let mut iter = NonPkPrefixSkipWatermarkIterator::new(
1140                merge_iter,
1141                NonPkPrefixSkipWatermarkState::new(
1142                    BTreeMap::from_iter(once((TABLE_ID, t1_read_watermark.clone()))),
1143                    compaction_catalog_agent_ref,
1144                ),
1145            );
1146
1147            iter.rewind().await.unwrap();
1148            assert!(iter.is_valid());
1149            let mut t1_kv_pairs = (5..10_i32)
1150                .map(|i| {
1151                    let (k, v) = gen_key_value(
1152                        i as usize % 2,
1153                        10 - i,
1154                        0,
1155                        i,
1156                        i,
1157                        &t1_pk_serde,
1158                        &t1_pk_indices,
1159                    );
1160                    (k, v)
1161                })
1162                .collect_vec();
1163
1164            t1_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1165
1166            let mut t2_kv_pairs = (0..10_i32)
1167                .map(|i| {
1168                    gen_key_value(
1169                        i as usize % 2,
1170                        10 - i,
1171                        0,
1172                        0,
1173                        0,
1174                        &t2_pk_serde,
1175                        &t2_pk_indices,
1176                    )
1177                })
1178                .collect_vec();
1179
1180            t2_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1181            let mut index = 0;
1182            for _ in 0..t1_kv_pairs.len() {
1183                assert!(t1_kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
1184                iter.next().await.unwrap();
1185                index += 1;
1186            }
1187
1188            assert!(iter.is_valid());
1189            assert_eq!(t1_kv_pairs.len(), index);
1190
1191            index = 0;
1192            for _ in 0..t2_kv_pairs.len() {
1193                assert!(t2_kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
1194                iter.next().await.unwrap();
1195                index += 1;
1196            }
1197
1198            assert!(!iter.is_valid());
1199            assert_eq!(t2_kv_pairs.len(), index);
1200        }
1201
1202        {
1203            let t1_iter = t1_shared_buffer_batch.clone().into_forward_iter();
1204            let t2_iter = t2_shared_buffer_batch.clone().into_forward_iter();
1205            let iter_vec = vec![t1_iter, t2_iter];
1206            let merge_iter = MergeIterator::new(iter_vec);
1207
1208            let full_key_filter_key_extractor =
1209                FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
1210
1211            let table_id_to_vnode = HashMap::from_iter(
1212                vec![
1213                    (t1_id, VirtualNode::COUNT_FOR_TEST),
1214                    (t2_id, VirtualNode::COUNT_FOR_TEST),
1215                ]
1216                .into_iter(),
1217            );
1218
1219            let table_id_to_watermark_serde = HashMap::from_iter(
1220                vec![
1221                    (
1222                        t1_id,
1223                        Some((
1224                            t1_pk_serde.clone(),
1225                            watermark_col_serde.clone(),
1226                            t1_watermark_col_idx_in_pk,
1227                        )),
1228                    ),
1229                    (t2_id, None),
1230                ]
1231                .into_iter(),
1232            );
1233
1234            let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1235                full_key_filter_key_extractor,
1236                table_id_to_vnode,
1237                table_id_to_watermark_serde,
1238            ));
1239
1240            let non_pk_prefix_iter = NonPkPrefixSkipWatermarkIterator::new(
1241                merge_iter,
1242                NonPkPrefixSkipWatermarkState::new(
1243                    BTreeMap::from_iter(once((t1_id, t1_read_watermark.clone()))),
1244                    compaction_catalog_agent_ref.clone(),
1245                ),
1246            );
1247
1248            let mut mix_iter = PkPrefixSkipWatermarkIterator::new(
1249                non_pk_prefix_iter,
1250                PkPrefixSkipWatermarkState::new(BTreeMap::from_iter(once((
1251                    t2_id,
1252                    t2_read_watermark.clone(),
1253                )))),
1254            );
1255
1256            mix_iter.rewind().await.unwrap();
1257            assert!(mix_iter.is_valid());
1258
1259            let mut t1_kv_pairs = (5..10_i32)
1260                .map(|i| {
1261                    let (k, v) = gen_key_value(
1262                        i as usize % 2,
1263                        10 - i,
1264                        0,
1265                        i,
1266                        i,
1267                        &t1_pk_serde,
1268                        &t1_pk_indices,
1269                    );
1270                    (k, v)
1271                })
1272                .collect_vec();
1273
1274            t1_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1275
1276            let mut t2_kv_pairs = (0..=5_i32)
1277                .map(|i| {
1278                    gen_key_value(
1279                        i as usize % 2,
1280                        10 - i,
1281                        0,
1282                        0,
1283                        0,
1284                        &t2_pk_serde,
1285                        &t2_pk_indices,
1286                    )
1287                })
1288                .collect_vec();
1289
1290            t2_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1291
1292            let mut index = 0;
1293            for _ in 0..t1_kv_pairs.len() {
1294                assert!(
1295                    t1_kv_pairs[index].0.as_ref() == mix_iter.key().user_key.table_key.as_ref()
1296                );
1297                mix_iter.next().await.unwrap();
1298                index += 1;
1299            }
1300
1301            assert!(mix_iter.is_valid());
1302            assert_eq!(t1_kv_pairs.len(), index);
1303
1304            index = 0;
1305
1306            for _ in 0..t2_kv_pairs.len() {
1307                assert!(
1308                    t2_kv_pairs[index].0.as_ref() == mix_iter.key().user_key.table_key.as_ref()
1309                );
1310                mix_iter.next().await.unwrap();
1311                index += 1;
1312            }
1313
1314            assert!(!mix_iter.is_valid());
1315            assert_eq!(t2_kv_pairs.len(), index);
1316        }
1317    }
1318}