risingwave_storage/hummock/iterator/
skip_watermark.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::{BTreeMap, VecDeque};
17
18use bytes::Bytes;
19use risingwave_common::catalog::TableId;
20use risingwave_common::hash::VirtualNode;
21use risingwave_common::row::Row;
22use risingwave_common::types::Datum;
23use risingwave_common::util::row_serde::OrderedRowSerde;
24use risingwave_hummock_sdk::compaction_group::hummock_version_ext::safe_epoch_read_table_watermarks_impl;
25use risingwave_hummock_sdk::key::FullKey;
26use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap, add_table_stats_map};
27use risingwave_hummock_sdk::table_watermark::{
28    ReadTableWatermark, TableWatermarks, WatermarkDirection,
29};
30
31use super::SkipWatermarkState;
32use crate::compaction_catalog_manager::CompactionCatalogAgentRef;
33use crate::hummock::HummockResult;
34use crate::hummock::iterator::{Forward, HummockIterator, ValueMeta};
35use crate::hummock::value::HummockValue;
36use crate::monitor::StoreLocalStatistic;
37
38pub struct SkipWatermarkIterator<I, S> {
39    inner: I,
40    state: S,
41    /// The stats of skipped key-value pairs for each table.
42    skipped_entry_table_stats: TableStatsMap,
43    /// The id of table currently undergoing processing.
44    last_table_id: Option<TableId>,
45    /// The stats of table currently undergoing processing.
46    last_table_stats: TableStats,
47}
48
49impl<I: HummockIterator<Direction = Forward>, S: SkipWatermarkState> SkipWatermarkIterator<I, S> {
50    pub fn new(inner: I, state: S) -> Self {
51        Self {
52            inner,
53            state,
54            skipped_entry_table_stats: TableStatsMap::default(),
55            last_table_id: None,
56            last_table_stats: TableStats::default(),
57        }
58    }
59
60    fn reset_watermark(&mut self) {
61        self.state.reset_watermark();
62    }
63
64    fn reset_skipped_entry_table_stats(&mut self) {
65        self.skipped_entry_table_stats = TableStatsMap::default();
66        self.last_table_id = None;
67        self.last_table_stats = TableStats::default();
68    }
69
70    /// Advance the key until iterator invalid or the current key will not be filtered by the latest watermark.
71    /// Calling this method should ensure that the first remaining watermark has been advanced to the current key.
72    ///
73    /// Return a flag indicating whether should later advance the watermark.
74    async fn advance_key_and_watermark(&mut self) -> HummockResult<()> {
75        // advance key and watermark in an interleave manner until nothing
76        // changed after the method is called.
77        while self.inner.is_valid() {
78            if !self.state.should_delete(&self.inner.key()) {
79                break;
80            }
81
82            if self
83                .last_table_id
84                .is_none_or(|last_table_id| last_table_id != self.inner.key().user_key.table_id)
85            {
86                self.add_last_table_stats();
87                self.last_table_id = Some(self.inner.key().user_key.table_id);
88            }
89            self.last_table_stats.total_key_count -= 1;
90            self.last_table_stats.total_key_size -= self.inner.key().encoded_len() as i64;
91            self.last_table_stats.total_value_size -= self.inner.value().encoded_len() as i64;
92
93            self.inner.next().await?;
94        }
95        self.add_last_table_stats();
96        Ok(())
97    }
98
99    fn add_last_table_stats(&mut self) {
100        let Some(last_table_id) = self.last_table_id.take() else {
101            return;
102        };
103        let delta = std::mem::take(&mut self.last_table_stats);
104        let e = self
105            .skipped_entry_table_stats
106            .entry(last_table_id)
107            .or_default();
108        e.total_key_count += delta.total_key_count;
109        e.total_key_size += delta.total_key_size;
110        e.total_value_size += delta.total_value_size;
111    }
112}
113
114impl<I: HummockIterator<Direction = Forward>, S: SkipWatermarkState> HummockIterator
115    for SkipWatermarkIterator<I, S>
116{
117    type Direction = Forward;
118
119    async fn next(&mut self) -> HummockResult<()> {
120        self.inner.next().await?;
121        // Check whether there is any remaining watermark and return early to
122        // avoid calling the async `advance_key_and_watermark`, since in benchmark
123        // performance downgrade is observed without this early return.
124        if self.state.has_watermark() {
125            self.advance_key_and_watermark().await?;
126        }
127        Ok(())
128    }
129
130    fn key(&self) -> FullKey<&[u8]> {
131        self.inner.key()
132    }
133
134    fn value(&self) -> HummockValue<&[u8]> {
135        self.inner.value()
136    }
137
138    fn is_valid(&self) -> bool {
139        self.inner.is_valid()
140    }
141
142    async fn rewind(&mut self) -> HummockResult<()> {
143        self.reset_watermark();
144        self.reset_skipped_entry_table_stats();
145        self.inner.rewind().await?;
146        self.advance_key_and_watermark().await?;
147        Ok(())
148    }
149
150    async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
151        self.reset_watermark();
152        self.reset_skipped_entry_table_stats();
153        self.inner.seek(key).await?;
154        self.advance_key_and_watermark().await?;
155        Ok(())
156    }
157
158    fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
159        add_table_stats_map(
160            &mut stats.skipped_by_watermark_table_stats,
161            &self.skipped_entry_table_stats,
162        );
163        self.inner.collect_local_statistic(stats)
164    }
165
166    fn value_meta(&self) -> ValueMeta {
167        self.inner.value_meta()
168    }
169}
170pub struct PkPrefixSkipWatermarkState {
171    watermarks: BTreeMap<TableId, ReadTableWatermark>,
172    remain_watermarks: VecDeque<(TableId, VirtualNode, WatermarkDirection, Bytes)>,
173}
174
175impl SkipWatermarkState for PkPrefixSkipWatermarkState {
176    #[inline(always)]
177    fn has_watermark(&self) -> bool {
178        !self.remain_watermarks.is_empty()
179    }
180
181    fn should_delete(&mut self, key: &FullKey<&[u8]>) -> bool {
182        if let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
183            let key_table_id = key.user_key.table_id;
184            let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
185            match (&key_table_id, &key_vnode).cmp(&(table_id, vnode)) {
186                Ordering::Less => {
187                    return false;
188                }
189                Ordering::Equal => {
190                    return direction.key_filter_by_watermark(inner_key, watermark);
191                }
192                Ordering::Greater => {
193                    // The current key has advanced over the watermark.
194                    // We may advance the watermark before advancing the key.
195                    return self.advance_watermark(key);
196                }
197            }
198        }
199        false
200    }
201
202    fn reset_watermark(&mut self) {
203        self.remain_watermarks = self
204            .watermarks
205            .iter()
206            .flat_map(|(table_id, read_watermarks)| {
207                read_watermarks
208                    .vnode_watermarks
209                    .iter()
210                    .map(|(vnode, watermarks)| {
211                        (
212                            *table_id,
213                            *vnode,
214                            read_watermarks.direction,
215                            watermarks.clone(),
216                        )
217                    })
218            })
219            .collect();
220    }
221
222    /// Advance watermark until no watermark remains or the first watermark can possibly
223    /// filter out the current or future key.
224    ///
225    /// Return a flag indicating whether the current key will be filtered by the current watermark.
226    fn advance_watermark(&mut self, key: &FullKey<&[u8]>) -> bool {
227        let key_table_id = key.user_key.table_id;
228        let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
229        while let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
230            match (table_id, vnode).cmp(&(&key_table_id, &key_vnode)) {
231                Ordering::Less => {
232                    self.remain_watermarks.pop_front();
233                    continue;
234                }
235                Ordering::Equal => {
236                    match direction {
237                        WatermarkDirection::Ascending => {
238                            match inner_key.cmp(watermark.as_ref()) {
239                                Ordering::Less => {
240                                    // The current key will be filtered by the watermark.
241                                    // Return true to further advance the key.
242                                    return true;
243                                }
244                                Ordering::Equal | Ordering::Greater => {
245                                    // The current key has passed the watermark.
246                                    // Advance the next watermark.
247                                    self.remain_watermarks.pop_front();
248                                    // Since it is impossible for a (table_id, vnode) tuple to have multiple
249                                    // watermark, after the pop_front, the next (table_id, vnode) must have
250                                    // exceeded the current key, and we can directly return and mark that the
251                                    // current key is not filtered by the watermark at the front.
252                                    #[cfg(debug_assertions)]
253                                    {
254                                        if let Some((next_table_id, next_vnode, _, _)) =
255                                            self.remain_watermarks.front()
256                                        {
257                                            assert!(
258                                                (next_table_id, next_vnode)
259                                                    > (&key_table_id, &key_vnode)
260                                            );
261                                        }
262                                    }
263                                    return false;
264                                }
265                            }
266                        }
267                        WatermarkDirection::Descending => {
268                            return match inner_key.cmp(watermark.as_ref()) {
269                                // Current key as not reached the watermark. Just return.
270                                Ordering::Less | Ordering::Equal => false,
271                                // Current key will be filtered by the watermark.
272                                // Return true to further advance the key.
273                                Ordering::Greater => true,
274                            };
275                        }
276                    }
277                }
278                Ordering::Greater => {
279                    return false;
280                }
281            }
282        }
283        false
284    }
285}
286
287impl PkPrefixSkipWatermarkState {
288    pub fn new(watermarks: BTreeMap<TableId, ReadTableWatermark>) -> Self {
289        Self {
290            remain_watermarks: VecDeque::new(),
291            watermarks,
292        }
293    }
294
295    pub fn from_safe_epoch_watermarks(
296        safe_epoch_watermarks: BTreeMap<TableId, TableWatermarks>,
297    ) -> Self {
298        let watermarks = safe_epoch_read_table_watermarks_impl(safe_epoch_watermarks);
299        Self::new(watermarks)
300    }
301}
302
303pub struct NonPkPrefixSkipWatermarkState {
304    watermarks: BTreeMap<TableId, ReadTableWatermark>,
305    remain_watermarks: VecDeque<(TableId, VirtualNode, WatermarkDirection, Datum)>,
306    compaction_catalog_agent_ref: CompactionCatalogAgentRef,
307
308    last_serde: Option<(OrderedRowSerde, OrderedRowSerde, usize)>,
309    last_table_id: Option<TableId>,
310}
311
312impl NonPkPrefixSkipWatermarkState {
313    pub fn new(
314        watermarks: BTreeMap<TableId, ReadTableWatermark>,
315        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
316    ) -> Self {
317        Self {
318            remain_watermarks: VecDeque::new(),
319            watermarks,
320            compaction_catalog_agent_ref,
321            last_serde: None,
322            last_table_id: None,
323        }
324    }
325
326    pub fn from_safe_epoch_watermarks(
327        safe_epoch_watermarks: BTreeMap<TableId, TableWatermarks>,
328        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
329    ) -> Self {
330        let watermarks = safe_epoch_read_table_watermarks_impl(safe_epoch_watermarks);
331        Self::new(watermarks, compaction_catalog_agent_ref)
332    }
333}
334
335impl SkipWatermarkState for NonPkPrefixSkipWatermarkState {
336    #[inline(always)]
337    fn has_watermark(&self) -> bool {
338        !self.remain_watermarks.is_empty()
339    }
340
341    fn should_delete(&mut self, key: &FullKey<&[u8]>) -> bool {
342        if let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
343            let key_table_id = key.user_key.table_id;
344            {
345                if self
346                    .last_table_id
347                    .is_none_or(|last_table_id| last_table_id != key_table_id)
348                {
349                    self.last_table_id = Some(key_table_id);
350                    self.last_serde = self.compaction_catalog_agent_ref.watermark_serde(*table_id);
351                }
352            }
353
354            let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
355            match (&key_table_id, &key_vnode).cmp(&(table_id, vnode)) {
356                Ordering::Less => {
357                    return false;
358                }
359                Ordering::Equal => {
360                    let (pk_prefix_serde, watermark_col_serde, watermark_col_idx_in_pk) =
361                        self.last_serde.as_ref().unwrap();
362                    let row = pk_prefix_serde
363                        .deserialize(inner_key)
364                        .unwrap_or_else(|_| {
365                            panic!("Failed to deserialize pk_prefix inner_key {:?} serde data_types {:?} order_types {:?}", inner_key, pk_prefix_serde.get_data_types(), pk_prefix_serde.get_order_types());
366                        });
367                    let watermark_col_in_pk = row.datum_at(*watermark_col_idx_in_pk);
368                    return direction.datum_filter_by_watermark(
369                        watermark_col_in_pk,
370                        watermark,
371                        watermark_col_serde.get_order_types()[0],
372                    );
373                }
374                Ordering::Greater => {
375                    // The current key has advanced over the watermark.
376                    // We may advance the watermark before advancing the key.
377                    return self.advance_watermark(key);
378                }
379            }
380        }
381        false
382    }
383
384    fn reset_watermark(&mut self) {
385        self.remain_watermarks = self
386            .watermarks
387            .iter()
388            .flat_map(|(table_id, read_watermarks)| {
389                let watermark_serde = self.compaction_catalog_agent_ref.watermark_serde(*table_id).map(|(_pk_serde, watermark_serde, _watermark_col_idx_in_pk)| watermark_serde);
390
391                read_watermarks
392                    .vnode_watermarks
393                    .iter()
394                    .flat_map(move |(vnode, watermarks)| {
395                        // TODO(ttl): if the watermark column is in the value, we may get a `None` here, support it.
396                        let watermark_serde = watermark_serde.as_ref()?;
397                        Some((
398                            *table_id,
399                            *vnode,
400                            read_watermarks.direction,
401                            {
402                                let row = watermark_serde
403                                .deserialize(watermarks).unwrap_or_else(|_| {
404                                    panic!("Failed to deserialize watermark {:?} serde data_types {:?} order_types {:?}", watermarks, watermark_serde.get_data_types(), watermark_serde.get_order_types());
405                                });
406                                row[0].clone()
407                            },
408                        ))
409                    })
410            })
411            .collect();
412    }
413
414    fn advance_watermark(&mut self, key: &FullKey<&[u8]>) -> bool {
415        let key_table_id = key.user_key.table_id;
416        let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
417        while let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
418            match (table_id, vnode).cmp(&(&key_table_id, &key_vnode)) {
419                Ordering::Less => {
420                    self.remain_watermarks.pop_front();
421                    continue;
422                }
423                Ordering::Equal => {
424                    let (pk_prefix_serde, watermark_col_serde, watermark_col_idx_in_pk) =
425                        self.last_serde.as_ref().unwrap();
426
427                    let row = pk_prefix_serde
428                        .deserialize(inner_key)
429                        .unwrap_or_else(|_| {
430                            panic!("Failed to deserialize pk_prefix inner_key {:?} serde data_types {:?} order_types {:?}", inner_key, pk_prefix_serde.get_data_types(), pk_prefix_serde.get_order_types());
431                        });
432                    let watermark_col_in_pk = row.datum_at(*watermark_col_idx_in_pk);
433
434                    return direction.datum_filter_by_watermark(
435                        watermark_col_in_pk,
436                        watermark,
437                        watermark_col_serde.get_order_types()[0],
438                    );
439                }
440                Ordering::Greater => {
441                    return false;
442                }
443            }
444        }
445        false
446    }
447}
448
449pub type PkPrefixSkipWatermarkIterator<I> = SkipWatermarkIterator<I, PkPrefixSkipWatermarkState>;
450
451pub type NonPkPrefixSkipWatermarkIterator<I> =
452    SkipWatermarkIterator<I, NonPkPrefixSkipWatermarkState>;
453
454#[cfg(test)]
455mod tests {
456    use std::collections::{BTreeMap, HashMap};
457    use std::iter::{empty, once};
458    use std::sync::Arc;
459
460    use bytes::Bytes;
461    use itertools::Itertools;
462    use risingwave_common::catalog::TableId;
463    use risingwave_common::hash::VirtualNode;
464    use risingwave_common::row::{OwnedRow, RowExt};
465    use risingwave_common::types::{DataType, ScalarImpl};
466    use risingwave_common::util::epoch::test_epoch;
467    use risingwave_common::util::row_serde::OrderedRowSerde;
468    use risingwave_common::util::sort_util::OrderType;
469    use risingwave_hummock_sdk::EpochWithGap;
470    use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey, gen_key_from_str};
471    use risingwave_hummock_sdk::table_watermark::{ReadTableWatermark, WatermarkDirection};
472
473    use super::PkPrefixSkipWatermarkState;
474    use crate::compaction_catalog_manager::{
475        CompactionCatalogAgent, FilterKeyExtractorImpl, FullKeyFilterKeyExtractor,
476    };
477    use crate::hummock::iterator::{
478        HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
479        NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator,
480    };
481    use crate::hummock::shared_buffer::shared_buffer_batch::{
482        SharedBufferBatch, SharedBufferValue,
483    };
484    use crate::row_serde::row_serde_util::{serialize_pk, serialize_pk_with_vnode};
485
486    const EPOCH: u64 = test_epoch(1);
487    const TABLE_ID: TableId = TableId::new(233);
488
489    async fn assert_iter_eq(
490        mut first: Option<impl HummockIterator>,
491        mut second: impl HummockIterator,
492        seek_key: Option<(usize, usize)>,
493    ) {
494        if let Some((vnode, key_index)) = seek_key {
495            let (seek_key, _) = gen_key_value(vnode, key_index);
496            let full_key = FullKey {
497                user_key: UserKey {
498                    table_id: TABLE_ID,
499                    table_key: seek_key,
500                },
501                epoch_with_gap: EpochWithGap::new_from_epoch(EPOCH),
502            };
503            if let Some(first) = &mut first {
504                first.seek(full_key.to_ref()).await.unwrap();
505            }
506            second.seek(full_key.to_ref()).await.unwrap()
507        } else {
508            if let Some(first) = &mut first {
509                first.rewind().await.unwrap();
510            }
511            second.rewind().await.unwrap();
512        }
513
514        if let Some(first) = &mut first {
515            while first.is_valid() {
516                assert!(second.is_valid());
517                let first_key = first.key();
518                let second_key = second.key();
519                assert_eq!(first_key, second_key);
520                assert_eq!(first.value(), second.value());
521                first.next().await.unwrap();
522                second.next().await.unwrap();
523            }
524        }
525        assert!(!second.is_valid());
526    }
527
528    fn build_batch(
529        pairs: impl Iterator<Item = (TableKey<Bytes>, SharedBufferValue<Bytes>)>,
530        table_id: TableId,
531    ) -> Option<SharedBufferBatch> {
532        let pairs: Vec<_> = pairs.collect();
533        if pairs.is_empty() {
534            None
535        } else {
536            Some(SharedBufferBatch::for_test(pairs, EPOCH, table_id))
537        }
538    }
539
540    fn filter_with_watermarks(
541        iter: impl Iterator<Item = (TableKey<Bytes>, SharedBufferValue<Bytes>)>,
542        table_watermarks: ReadTableWatermark,
543    ) -> impl Iterator<Item = (TableKey<Bytes>, SharedBufferValue<Bytes>)> {
544        iter.filter(move |(key, _)| {
545            if let Some(watermark) = table_watermarks.vnode_watermarks.get(&key.vnode_part()) {
546                !table_watermarks
547                    .direction
548                    .key_filter_by_watermark(key.key_part(), watermark)
549            } else {
550                true
551            }
552        })
553    }
554
555    fn gen_inner_key(index: usize) -> String {
556        format!("key{:5}", index)
557    }
558
559    async fn test_watermark(
560        watermarks: impl IntoIterator<Item = (usize, usize)>,
561        direction: WatermarkDirection,
562    ) {
563        let test_index: [(usize, usize); 7] =
564            [(0, 2), (0, 3), (0, 4), (1, 1), (1, 3), (4, 2), (8, 1)];
565        let items = test_index
566            .iter()
567            .map(|(vnode, key_index)| gen_key_value(*vnode, *key_index))
568            .collect_vec();
569
570        let read_watermark = ReadTableWatermark {
571            direction,
572            vnode_watermarks: BTreeMap::from_iter(watermarks.into_iter().map(
573                |(vnode, key_index)| {
574                    (
575                        VirtualNode::from_index(vnode),
576                        Bytes::from(gen_inner_key(key_index)),
577                    )
578                },
579            )),
580        };
581
582        let gen_iters = || {
583            let batch = build_batch(
584                filter_with_watermarks(items.clone().into_iter(), read_watermark.clone()),
585                TABLE_ID,
586            );
587
588            let iter = PkPrefixSkipWatermarkIterator::new(
589                build_batch(items.clone().into_iter(), TABLE_ID)
590                    .unwrap()
591                    .into_forward_iter(),
592                PkPrefixSkipWatermarkState::new(BTreeMap::from_iter(once((
593                    TABLE_ID,
594                    read_watermark.clone(),
595                )))),
596            );
597            (batch.map(|batch| batch.into_forward_iter()), iter)
598        };
599        let (first, second) = gen_iters();
600        assert_iter_eq(first, second, None).await;
601        for (vnode, key_index) in &test_index {
602            let (first, second) = gen_iters();
603            assert_iter_eq(first, second, Some((*vnode, *key_index))).await;
604        }
605        let (last_vnode, last_key_index) = test_index.last().unwrap();
606        let (first, second) = gen_iters();
607        assert_iter_eq(first, second, Some((*last_vnode, last_key_index + 1))).await;
608    }
609
610    fn gen_key_value(vnode: usize, index: usize) -> (TableKey<Bytes>, SharedBufferValue<Bytes>) {
611        (
612            gen_key_from_str(VirtualNode::from_index(vnode), &gen_inner_key(index)),
613            SharedBufferValue::Insert(Bytes::copy_from_slice(
614                format!("{}-value-{}", vnode, index).as_bytes(),
615            )),
616        )
617    }
618
619    #[tokio::test]
620    async fn test_no_watermark() {
621        test_watermark(empty(), WatermarkDirection::Ascending).await;
622        test_watermark(empty(), WatermarkDirection::Descending).await;
623    }
624
625    #[tokio::test]
626    async fn test_too_low_watermark() {
627        test_watermark(vec![(0, 0)], WatermarkDirection::Ascending).await;
628        test_watermark(vec![(0, 10)], WatermarkDirection::Descending).await;
629    }
630
631    #[tokio::test]
632    async fn test_single_watermark() {
633        test_watermark(vec![(0, 3)], WatermarkDirection::Ascending).await;
634        test_watermark(vec![(0, 3)], WatermarkDirection::Descending).await;
635    }
636
637    #[tokio::test]
638    async fn test_watermark_vnode_no_data() {
639        test_watermark(vec![(3, 3)], WatermarkDirection::Ascending).await;
640        test_watermark(vec![(3, 3)], WatermarkDirection::Descending).await;
641    }
642
643    #[tokio::test]
644    async fn test_filter_all() {
645        test_watermark(
646            vec![(0, 5), (1, 4), (2, 0), (4, 3), (8, 2)],
647            WatermarkDirection::Ascending,
648        )
649        .await;
650        test_watermark(
651            vec![(0, 0), (1, 0), (2, 0), (4, 0), (8, 0)],
652            WatermarkDirection::Descending,
653        )
654        .await;
655    }
656
657    #[tokio::test]
658    async fn test_advance_multi_vnode() {
659        test_watermark(vec![(1, 2), (8, 0)], WatermarkDirection::Ascending).await;
660    }
661
662    #[tokio::test]
663    async fn test_non_pk_prefix_watermark() {
664        fn gen_key_value(
665            vnode: usize,
666            col_0: i32,
667            col_1: i32,
668            col_2: i32,
669            col_3: i32,
670            pk_serde: &OrderedRowSerde,
671            pk_indices: &[usize],
672        ) -> (TableKey<Bytes>, SharedBufferValue<Bytes>) {
673            let r = OwnedRow::new(vec![
674                Some(ScalarImpl::Int32(col_0)),
675                Some(ScalarImpl::Int32(col_1)),
676                Some(ScalarImpl::Int32(col_2)), // watermark column
677                Some(ScalarImpl::Int32(col_3)),
678            ]);
679
680            let pk = r.project(pk_indices);
681
682            let k1 = serialize_pk_with_vnode(pk, pk_serde, VirtualNode::from_index(vnode));
683            let v1 = SharedBufferValue::Insert(Bytes::copy_from_slice(
684                format!("{}-value-{}", vnode, col_2).as_bytes(),
685            ));
686            (k1, v1)
687        }
688
689        let watermark_direction = WatermarkDirection::Ascending;
690
691        let watermark_col_serde =
692            OrderedRowSerde::new(vec![DataType::Int32], vec![OrderType::ascending()]);
693        let pk_serde = OrderedRowSerde::new(
694            vec![DataType::Int32, DataType::Int32, DataType::Int32],
695            vec![
696                OrderType::ascending(),
697                OrderType::ascending(),
698                OrderType::ascending(),
699            ],
700        );
701
702        let pk_indices = vec![0, 2, 3];
703        let watermark_col_idx_in_pk = 1;
704
705        {
706            // test single vnode
707            let shared_buffer_batch = {
708                let mut kv_pairs = (0..10)
709                    .map(|i| gen_key_value(0, i, 0, i, i, &pk_serde, &pk_indices))
710                    .collect_vec();
711                kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
712                build_batch(kv_pairs.into_iter(), TABLE_ID)
713            }
714            .unwrap();
715
716            {
717                // empty read watermark
718                let read_watermark = ReadTableWatermark {
719                    direction: watermark_direction,
720                    vnode_watermarks: BTreeMap::default(),
721                };
722
723                let compaction_catalog_agent_ref = CompactionCatalogAgent::for_test(vec![TABLE_ID]);
724
725                let mut iter = NonPkPrefixSkipWatermarkIterator::new(
726                    shared_buffer_batch.clone().into_forward_iter(),
727                    NonPkPrefixSkipWatermarkState::new(
728                        BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
729                        compaction_catalog_agent_ref,
730                    ),
731                );
732
733                iter.rewind().await.unwrap();
734                assert!(iter.is_valid());
735                for i in 0..10 {
736                    let (k, _v) = gen_key_value(0, i, 0, i, i, &pk_serde, &pk_indices);
737                    assert_eq!(iter.key().user_key.table_key.as_ref(), k.as_ref());
738                    iter.next().await.unwrap();
739                }
740                assert!(!iter.is_valid());
741            }
742
743            {
744                // test watermark
745                let watermark = {
746                    let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
747                    serialize_pk(r1, &watermark_col_serde)
748                };
749
750                let read_watermark = ReadTableWatermark {
751                    direction: watermark_direction,
752                    vnode_watermarks: BTreeMap::from_iter(once((
753                        VirtualNode::from_index(0),
754                        watermark.clone(),
755                    ))),
756                };
757
758                let full_key_filter_key_extractor =
759                    FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
760
761                let table_id_to_vnode =
762                    HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
763
764                let table_id_to_watermark_serde = HashMap::from_iter(once((
765                    TABLE_ID,
766                    Some((
767                        pk_serde.clone(),
768                        watermark_col_serde.clone(),
769                        watermark_col_idx_in_pk,
770                    )),
771                )));
772
773                let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
774                    full_key_filter_key_extractor,
775                    table_id_to_vnode,
776                    table_id_to_watermark_serde,
777                ));
778
779                let mut iter = NonPkPrefixSkipWatermarkIterator::new(
780                    shared_buffer_batch.clone().into_forward_iter(),
781                    NonPkPrefixSkipWatermarkState::new(
782                        BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
783                        compaction_catalog_agent_ref,
784                    ),
785                );
786
787                iter.rewind().await.unwrap();
788                assert!(iter.is_valid());
789                for i in 5..10 {
790                    let (k, _v) = gen_key_value(0, i, 0, i, i, &pk_serde, &pk_indices);
791                    assert_eq!(iter.key().user_key.table_key.as_ref(), k.as_ref());
792                    iter.next().await.unwrap();
793                }
794                assert!(!iter.is_valid());
795            }
796        }
797
798        {
799            // test multi vnode
800            let shared_buffer_batch = {
801                let mut kv_pairs = (0..10_i32)
802                    .map(|i| gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices))
803                    .collect_vec();
804
805                kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
806                build_batch(kv_pairs.into_iter(), TABLE_ID)
807            };
808
809            {
810                // test watermark
811                let watermark = {
812                    let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
813                    serialize_pk(r1, &watermark_col_serde)
814                };
815
816                let read_watermark = ReadTableWatermark {
817                    direction: watermark_direction,
818                    vnode_watermarks: BTreeMap::from_iter(
819                        (0..2).map(|i| (VirtualNode::from_index(i), watermark.clone())),
820                    ),
821                };
822
823                let full_key_filter_key_extractor =
824                    FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
825
826                let table_id_to_vnode =
827                    HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
828
829                let table_id_to_watermark_serde = HashMap::from_iter(once((
830                    TABLE_ID,
831                    Some((
832                        pk_serde.clone(),
833                        watermark_col_serde.clone(),
834                        watermark_col_idx_in_pk,
835                    )),
836                )));
837
838                let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
839                    full_key_filter_key_extractor,
840                    table_id_to_vnode,
841                    table_id_to_watermark_serde,
842                ));
843
844                let mut iter = NonPkPrefixSkipWatermarkIterator::new(
845                    shared_buffer_batch.clone().unwrap().into_forward_iter(),
846                    NonPkPrefixSkipWatermarkState::new(
847                        BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
848                        compaction_catalog_agent_ref,
849                    ),
850                );
851
852                iter.rewind().await.unwrap();
853                assert!(iter.is_valid());
854                let mut kv_pairs = (5..10_i32)
855                    .map(|i| {
856                        let (k, v) =
857                            gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices);
858                        (k, v)
859                    })
860                    .collect_vec();
861                kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
862                let mut index = 0;
863                while iter.is_valid() {
864                    assert!(kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
865                    iter.next().await.unwrap();
866                    index += 1;
867                }
868            }
869
870            {
871                // test watermark
872                let watermark = {
873                    let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
874                    serialize_pk(r1, &watermark_col_serde)
875                };
876
877                let read_watermark = ReadTableWatermark {
878                    direction: watermark_direction,
879                    vnode_watermarks: BTreeMap::from_iter(
880                        (0..2).map(|i| (VirtualNode::from_index(i), watermark.clone())),
881                    ),
882                };
883
884                let full_key_filter_key_extractor =
885                    FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
886
887                let table_id_to_vnode =
888                    HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
889
890                let table_id_to_watermark_serde = HashMap::from_iter(once((
891                    TABLE_ID,
892                    Some((
893                        pk_serde.clone(),
894                        watermark_col_serde.clone(),
895                        watermark_col_idx_in_pk,
896                    )),
897                )));
898
899                let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
900                    full_key_filter_key_extractor,
901                    table_id_to_vnode,
902                    table_id_to_watermark_serde,
903                ));
904
905                let mut iter = NonPkPrefixSkipWatermarkIterator::new(
906                    shared_buffer_batch.clone().unwrap().into_forward_iter(),
907                    NonPkPrefixSkipWatermarkState::new(
908                        BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
909                        compaction_catalog_agent_ref,
910                    ),
911                );
912
913                iter.rewind().await.unwrap();
914                assert!(iter.is_valid());
915                let mut kv_pairs = (5..10_i32)
916                    .map(|i| {
917                        let (k, v) =
918                            gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices);
919                        (k, v)
920                    })
921                    .collect_vec();
922                kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
923                let mut index = 0;
924                while iter.is_valid() {
925                    assert!(kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
926                    iter.next().await.unwrap();
927                    index += 1;
928                }
929            }
930
931            {
932                // test watermark Descending
933                let watermark = {
934                    let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
935                    serialize_pk(r1, &watermark_col_serde)
936                };
937
938                let read_watermark = ReadTableWatermark {
939                    direction: WatermarkDirection::Descending,
940                    vnode_watermarks: BTreeMap::from_iter(
941                        (0..2).map(|i| (VirtualNode::from_index(i), watermark.clone())),
942                    ),
943                };
944
945                let full_key_filter_key_extractor =
946                    FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
947
948                let table_id_to_vnode =
949                    HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
950
951                let table_id_to_watermark_serde = HashMap::from_iter(once((
952                    TABLE_ID,
953                    Some((
954                        pk_serde.clone(),
955                        watermark_col_serde.clone(),
956                        watermark_col_idx_in_pk,
957                    )),
958                )));
959
960                let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
961                    full_key_filter_key_extractor,
962                    table_id_to_vnode,
963                    table_id_to_watermark_serde,
964                ));
965
966                let mut iter = NonPkPrefixSkipWatermarkIterator::new(
967                    shared_buffer_batch.clone().unwrap().into_forward_iter(),
968                    NonPkPrefixSkipWatermarkState::new(
969                        BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
970                        compaction_catalog_agent_ref,
971                    ),
972                );
973
974                iter.rewind().await.unwrap();
975                assert!(iter.is_valid());
976                let mut kv_pairs = (0..=5_i32)
977                    .map(|i| {
978                        let (k, v) =
979                            gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices);
980                        (k, v)
981                    })
982                    .collect_vec();
983                kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
984                let mut index = 0;
985                while iter.is_valid() {
986                    assert!(kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
987                    iter.next().await.unwrap();
988                    index += 1;
989                }
990            }
991        }
992    }
993
994    #[tokio::test]
995    async fn test_mix_watermark() {
996        fn gen_key_value(
997            vnode: usize,
998            col_0: i32,
999            col_1: i32,
1000            col_2: i32,
1001            col_3: i32,
1002            pk_serde: &OrderedRowSerde,
1003            pk_indices: &[usize],
1004        ) -> (TableKey<Bytes>, SharedBufferValue<Bytes>) {
1005            let r = OwnedRow::new(vec![
1006                Some(ScalarImpl::Int32(col_0)),
1007                Some(ScalarImpl::Int32(col_1)),
1008                Some(ScalarImpl::Int32(col_2)), // watermark column
1009                Some(ScalarImpl::Int32(col_3)),
1010            ]);
1011
1012            let pk = r.project(pk_indices);
1013
1014            let k1 = serialize_pk_with_vnode(pk, pk_serde, VirtualNode::from_index(vnode));
1015            let v1 = SharedBufferValue::Insert(Bytes::copy_from_slice(
1016                format!("{}-value-{}-{}-{}-{}", vnode, col_0, col_1, col_2, col_3).as_bytes(),
1017            ));
1018
1019            (k1, v1)
1020        }
1021
1022        let watermark_col_serde =
1023            OrderedRowSerde::new(vec![DataType::Int32], vec![OrderType::ascending()]);
1024        let t1_pk_serde = OrderedRowSerde::new(
1025            vec![DataType::Int32, DataType::Int32, DataType::Int32],
1026            vec![
1027                OrderType::ascending(),
1028                OrderType::ascending(),
1029                OrderType::ascending(),
1030            ],
1031        );
1032
1033        let t1_pk_indices = vec![0, 2, 3];
1034        let t1_watermark_col_idx_in_pk = 1;
1035
1036        let t2_pk_indices = vec![0, 1, 2];
1037
1038        let t2_pk_serde = OrderedRowSerde::new(
1039            vec![DataType::Int32, DataType::Int32, DataType::Int32],
1040            vec![
1041                OrderType::ascending(),
1042                OrderType::ascending(),
1043                OrderType::ascending(),
1044            ],
1045        );
1046
1047        let t1_id = TABLE_ID;
1048        let t2_id = TableId::from(t1_id.as_raw_id() + 1);
1049
1050        let t1_shared_buffer_batch = {
1051            let mut kv_pairs = (0..10_i32)
1052                .map(|i| {
1053                    gen_key_value(
1054                        i as usize % 2,
1055                        10 - i,
1056                        0,
1057                        i,
1058                        i,
1059                        &t1_pk_serde,
1060                        &t1_pk_indices,
1061                    )
1062                })
1063                .collect_vec();
1064
1065            kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1066            build_batch(kv_pairs.into_iter(), t1_id).unwrap()
1067        };
1068
1069        let t2_shared_buffer_batch = {
1070            let mut kv_pairs = (0..10_i32)
1071                .map(|i| {
1072                    gen_key_value(
1073                        i as usize % 2,
1074                        10 - i,
1075                        0,
1076                        0,
1077                        0,
1078                        &t2_pk_serde,
1079                        &t2_pk_indices,
1080                    )
1081                })
1082                .collect_vec();
1083
1084            kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1085            build_batch(kv_pairs.into_iter(), t2_id).unwrap()
1086        };
1087
1088        let t1_watermark = {
1089            let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
1090            serialize_pk(r1, &watermark_col_serde)
1091        };
1092
1093        let t1_read_watermark = ReadTableWatermark {
1094            direction: WatermarkDirection::Ascending,
1095            vnode_watermarks: BTreeMap::from_iter(
1096                (0..2).map(|i| (VirtualNode::from_index(i), t1_watermark.clone())),
1097            ),
1098        };
1099
1100        let t2_watermark = {
1101            let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
1102            serialize_pk(r1, &watermark_col_serde)
1103        };
1104
1105        let t2_read_watermark = ReadTableWatermark {
1106            direction: WatermarkDirection::Ascending,
1107            vnode_watermarks: BTreeMap::from_iter(
1108                (0..2).map(|i| (VirtualNode::from_index(i), t2_watermark.clone())),
1109            ),
1110        };
1111
1112        {
1113            // test non pk prefix watermark
1114            let t1_iter = t1_shared_buffer_batch.clone().into_forward_iter();
1115            let t2_iter = t2_shared_buffer_batch.clone().into_forward_iter();
1116            let iter_vec = vec![t1_iter, t2_iter];
1117            let merge_iter = MergeIterator::new(iter_vec);
1118
1119            let full_key_filter_key_extractor =
1120                FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
1121
1122            let table_id_to_vnode =
1123                HashMap::from_iter(once((TABLE_ID, VirtualNode::COUNT_FOR_TEST)));
1124
1125            let table_id_to_watermark_serde = HashMap::from_iter(once((
1126                t1_id,
1127                Some((
1128                    t1_pk_serde.clone(),
1129                    watermark_col_serde.clone(),
1130                    t1_watermark_col_idx_in_pk,
1131                )),
1132            )));
1133
1134            let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1135                full_key_filter_key_extractor,
1136                table_id_to_vnode,
1137                table_id_to_watermark_serde,
1138            ));
1139
1140            let mut iter = NonPkPrefixSkipWatermarkIterator::new(
1141                merge_iter,
1142                NonPkPrefixSkipWatermarkState::new(
1143                    BTreeMap::from_iter(once((TABLE_ID, t1_read_watermark.clone()))),
1144                    compaction_catalog_agent_ref,
1145                ),
1146            );
1147
1148            iter.rewind().await.unwrap();
1149            assert!(iter.is_valid());
1150            let mut t1_kv_pairs = (5..10_i32)
1151                .map(|i| {
1152                    let (k, v) = gen_key_value(
1153                        i as usize % 2,
1154                        10 - i,
1155                        0,
1156                        i,
1157                        i,
1158                        &t1_pk_serde,
1159                        &t1_pk_indices,
1160                    );
1161                    (k, v)
1162                })
1163                .collect_vec();
1164
1165            t1_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1166
1167            let mut t2_kv_pairs = (0..10_i32)
1168                .map(|i| {
1169                    gen_key_value(
1170                        i as usize % 2,
1171                        10 - i,
1172                        0,
1173                        0,
1174                        0,
1175                        &t2_pk_serde,
1176                        &t2_pk_indices,
1177                    )
1178                })
1179                .collect_vec();
1180
1181            t2_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1182            let mut index = 0;
1183            for _ in 0..t1_kv_pairs.len() {
1184                assert!(t1_kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
1185                iter.next().await.unwrap();
1186                index += 1;
1187            }
1188
1189            assert!(iter.is_valid());
1190            assert_eq!(t1_kv_pairs.len(), index);
1191
1192            index = 0;
1193            for _ in 0..t2_kv_pairs.len() {
1194                assert!(t2_kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
1195                iter.next().await.unwrap();
1196                index += 1;
1197            }
1198
1199            assert!(!iter.is_valid());
1200            assert_eq!(t2_kv_pairs.len(), index);
1201        }
1202
1203        {
1204            let t1_iter = t1_shared_buffer_batch.clone().into_forward_iter();
1205            let t2_iter = t2_shared_buffer_batch.clone().into_forward_iter();
1206            let iter_vec = vec![t1_iter, t2_iter];
1207            let merge_iter = MergeIterator::new(iter_vec);
1208
1209            let full_key_filter_key_extractor =
1210                FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
1211
1212            let table_id_to_vnode = HashMap::from_iter(
1213                vec![
1214                    (t1_id, VirtualNode::COUNT_FOR_TEST),
1215                    (t2_id, VirtualNode::COUNT_FOR_TEST),
1216                ]
1217                .into_iter(),
1218            );
1219
1220            let table_id_to_watermark_serde = HashMap::from_iter(
1221                vec![
1222                    (
1223                        t1_id,
1224                        Some((
1225                            t1_pk_serde.clone(),
1226                            watermark_col_serde.clone(),
1227                            t1_watermark_col_idx_in_pk,
1228                        )),
1229                    ),
1230                    (t2_id, None),
1231                ]
1232                .into_iter(),
1233            );
1234
1235            let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1236                full_key_filter_key_extractor,
1237                table_id_to_vnode,
1238                table_id_to_watermark_serde,
1239            ));
1240
1241            let non_pk_prefix_iter = NonPkPrefixSkipWatermarkIterator::new(
1242                merge_iter,
1243                NonPkPrefixSkipWatermarkState::new(
1244                    BTreeMap::from_iter(once((t1_id, t1_read_watermark.clone()))),
1245                    compaction_catalog_agent_ref.clone(),
1246                ),
1247            );
1248
1249            let mut mix_iter = PkPrefixSkipWatermarkIterator::new(
1250                non_pk_prefix_iter,
1251                PkPrefixSkipWatermarkState::new(BTreeMap::from_iter(once((
1252                    t2_id,
1253                    t2_read_watermark.clone(),
1254                )))),
1255            );
1256
1257            mix_iter.rewind().await.unwrap();
1258            assert!(mix_iter.is_valid());
1259
1260            let mut t1_kv_pairs = (5..10_i32)
1261                .map(|i| {
1262                    let (k, v) = gen_key_value(
1263                        i as usize % 2,
1264                        10 - i,
1265                        0,
1266                        i,
1267                        i,
1268                        &t1_pk_serde,
1269                        &t1_pk_indices,
1270                    );
1271                    (k, v)
1272                })
1273                .collect_vec();
1274
1275            t1_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1276
1277            let mut t2_kv_pairs = (0..=5_i32)
1278                .map(|i| {
1279                    gen_key_value(
1280                        i as usize % 2,
1281                        10 - i,
1282                        0,
1283                        0,
1284                        0,
1285                        &t2_pk_serde,
1286                        &t2_pk_indices,
1287                    )
1288                })
1289                .collect_vec();
1290
1291            t2_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1292
1293            let mut index = 0;
1294            for _ in 0..t1_kv_pairs.len() {
1295                assert!(
1296                    t1_kv_pairs[index].0.as_ref() == mix_iter.key().user_key.table_key.as_ref()
1297                );
1298                mix_iter.next().await.unwrap();
1299                index += 1;
1300            }
1301
1302            assert!(mix_iter.is_valid());
1303            assert_eq!(t1_kv_pairs.len(), index);
1304
1305            index = 0;
1306
1307            for _ in 0..t2_kv_pairs.len() {
1308                assert!(
1309                    t2_kv_pairs[index].0.as_ref() == mix_iter.key().user_key.table_key.as_ref()
1310                );
1311                mix_iter.next().await.unwrap();
1312                index += 1;
1313            }
1314
1315            assert!(!mix_iter.is_valid());
1316            assert_eq!(t2_kv_pairs.len(), index);
1317        }
1318    }
1319}