risingwave_storage/hummock/iterator/
skip_watermark.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::{BTreeMap, VecDeque};
17
18use bytes::Bytes;
19use risingwave_common::catalog::TableId;
20use risingwave_common::hash::VirtualNode;
21use risingwave_common::row::Row;
22use risingwave_common::types::Datum;
23use risingwave_common::util::row_serde::OrderedRowSerde;
24use risingwave_hummock_sdk::compaction_group::hummock_version_ext::safe_epoch_read_table_watermarks_impl;
25use risingwave_hummock_sdk::key::FullKey;
26use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap, add_table_stats_map};
27use risingwave_hummock_sdk::table_watermark::{
28    ReadTableWatermark, TableWatermarks, WatermarkDirection,
29};
30
31use super::SkipWatermarkState;
32use crate::compaction_catalog_manager::CompactionCatalogAgentRef;
33use crate::hummock::HummockResult;
34use crate::hummock::iterator::{Forward, HummockIterator, ValueMeta};
35use crate::hummock::value::HummockValue;
36use crate::monitor::StoreLocalStatistic;
37
38pub struct SkipWatermarkIterator<I, S> {
39    inner: I,
40    state: S,
41    /// The stats of skipped key-value pairs for each table.
42    skipped_entry_table_stats: TableStatsMap,
43    /// The id of table currently undergoing processing.
44    last_table_id: Option<u32>,
45    /// The stats of table currently undergoing processing.
46    last_table_stats: TableStats,
47}
48
49impl<I: HummockIterator<Direction = Forward>, S: SkipWatermarkState> SkipWatermarkIterator<I, S> {
50    pub fn new(inner: I, state: S) -> Self {
51        Self {
52            inner,
53            state,
54            skipped_entry_table_stats: TableStatsMap::default(),
55            last_table_id: None,
56            last_table_stats: TableStats::default(),
57        }
58    }
59
60    fn reset_watermark(&mut self) {
61        self.state.reset_watermark();
62    }
63
64    fn reset_skipped_entry_table_stats(&mut self) {
65        self.skipped_entry_table_stats = TableStatsMap::default();
66        self.last_table_id = None;
67        self.last_table_stats = TableStats::default();
68    }
69
70    /// Advance the key until iterator invalid or the current key will not be filtered by the latest watermark.
71    /// Calling this method should ensure that the first remaining watermark has been advanced to the current key.
72    ///
73    /// Return a flag indicating whether should later advance the watermark.
74    async fn advance_key_and_watermark(&mut self) -> HummockResult<()> {
75        // advance key and watermark in an interleave manner until nothing
76        // changed after the method is called.
77        while self.inner.is_valid() {
78            if !self.state.should_delete(&self.inner.key()) {
79                break;
80            }
81
82            if self.last_table_id.is_none_or(|last_table_id| {
83                last_table_id != self.inner.key().user_key.table_id.table_id
84            }) {
85                self.add_last_table_stats();
86                self.last_table_id = Some(self.inner.key().user_key.table_id.table_id);
87            }
88            self.last_table_stats.total_key_count -= 1;
89            self.last_table_stats.total_key_size -= self.inner.key().encoded_len() as i64;
90            self.last_table_stats.total_value_size -= self.inner.value().encoded_len() as i64;
91
92            self.inner.next().await?;
93        }
94        self.add_last_table_stats();
95        Ok(())
96    }
97
98    fn add_last_table_stats(&mut self) {
99        let Some(last_table_id) = self.last_table_id.take() else {
100            return;
101        };
102        let delta = std::mem::take(&mut self.last_table_stats);
103        let e = self
104            .skipped_entry_table_stats
105            .entry(last_table_id)
106            .or_default();
107        e.total_key_count += delta.total_key_count;
108        e.total_key_size += delta.total_key_size;
109        e.total_value_size += delta.total_value_size;
110    }
111}
112
113impl<I: HummockIterator<Direction = Forward>, S: SkipWatermarkState> HummockIterator
114    for SkipWatermarkIterator<I, S>
115{
116    type Direction = Forward;
117
118    async fn next(&mut self) -> HummockResult<()> {
119        self.inner.next().await?;
120        // Check whether there is any remaining watermark and return early to
121        // avoid calling the async `advance_key_and_watermark`, since in benchmark
122        // performance downgrade is observed without this early return.
123        if self.state.has_watermark() {
124            self.advance_key_and_watermark().await?;
125        }
126        Ok(())
127    }
128
129    fn key(&self) -> FullKey<&[u8]> {
130        self.inner.key()
131    }
132
133    fn value(&self) -> HummockValue<&[u8]> {
134        self.inner.value()
135    }
136
137    fn is_valid(&self) -> bool {
138        self.inner.is_valid()
139    }
140
141    async fn rewind(&mut self) -> HummockResult<()> {
142        self.reset_watermark();
143        self.reset_skipped_entry_table_stats();
144        self.inner.rewind().await?;
145        self.advance_key_and_watermark().await?;
146        Ok(())
147    }
148
149    async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
150        self.reset_watermark();
151        self.reset_skipped_entry_table_stats();
152        self.inner.seek(key).await?;
153        self.advance_key_and_watermark().await?;
154        Ok(())
155    }
156
157    fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
158        add_table_stats_map(
159            &mut stats.skipped_by_watermark_table_stats,
160            &self.skipped_entry_table_stats,
161        );
162        self.inner.collect_local_statistic(stats)
163    }
164
165    fn value_meta(&self) -> ValueMeta {
166        self.inner.value_meta()
167    }
168}
169pub struct PkPrefixSkipWatermarkState {
170    watermarks: BTreeMap<TableId, ReadTableWatermark>,
171    remain_watermarks: VecDeque<(TableId, VirtualNode, WatermarkDirection, Bytes)>,
172}
173
174impl SkipWatermarkState for PkPrefixSkipWatermarkState {
175    #[inline(always)]
176    fn has_watermark(&self) -> bool {
177        !self.remain_watermarks.is_empty()
178    }
179
180    fn should_delete(&mut self, key: &FullKey<&[u8]>) -> bool {
181        if let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
182            let key_table_id = key.user_key.table_id;
183            let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
184            match (&key_table_id, &key_vnode).cmp(&(table_id, vnode)) {
185                Ordering::Less => {
186                    return false;
187                }
188                Ordering::Equal => {
189                    return direction.key_filter_by_watermark(inner_key, watermark);
190                }
191                Ordering::Greater => {
192                    // The current key has advanced over the watermark.
193                    // We may advance the watermark before advancing the key.
194                    return self.advance_watermark(key);
195                }
196            }
197        }
198        false
199    }
200
201    fn reset_watermark(&mut self) {
202        self.remain_watermarks = self
203            .watermarks
204            .iter()
205            .flat_map(|(table_id, read_watermarks)| {
206                read_watermarks
207                    .vnode_watermarks
208                    .iter()
209                    .map(|(vnode, watermarks)| {
210                        (
211                            *table_id,
212                            *vnode,
213                            read_watermarks.direction,
214                            watermarks.clone(),
215                        )
216                    })
217            })
218            .collect();
219    }
220
221    /// Advance watermark until no watermark remains or the first watermark can possibly
222    /// filter out the current or future key.
223    ///
224    /// Return a flag indicating whether the current key will be filtered by the current watermark.
225    fn advance_watermark(&mut self, key: &FullKey<&[u8]>) -> bool {
226        let key_table_id = key.user_key.table_id;
227        let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
228        while let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
229            match (table_id, vnode).cmp(&(&key_table_id, &key_vnode)) {
230                Ordering::Less => {
231                    self.remain_watermarks.pop_front();
232                    continue;
233                }
234                Ordering::Equal => {
235                    match direction {
236                        WatermarkDirection::Ascending => {
237                            match inner_key.cmp(watermark.as_ref()) {
238                                Ordering::Less => {
239                                    // The current key will be filtered by the watermark.
240                                    // Return true to further advance the key.
241                                    return true;
242                                }
243                                Ordering::Equal | Ordering::Greater => {
244                                    // The current key has passed the watermark.
245                                    // Advance the next watermark.
246                                    self.remain_watermarks.pop_front();
247                                    // Since it is impossible for a (table_id, vnode) tuple to have multiple
248                                    // watermark, after the pop_front, the next (table_id, vnode) must have
249                                    // exceeded the current key, and we can directly return and mark that the
250                                    // current key is not filtered by the watermark at the front.
251                                    #[cfg(debug_assertions)]
252                                    {
253                                        if let Some((next_table_id, next_vnode, _, _)) =
254                                            self.remain_watermarks.front()
255                                        {
256                                            assert!(
257                                                (next_table_id, next_vnode)
258                                                    > (&key_table_id, &key_vnode)
259                                            );
260                                        }
261                                    }
262                                    return false;
263                                }
264                            }
265                        }
266                        WatermarkDirection::Descending => {
267                            return match inner_key.cmp(watermark.as_ref()) {
268                                // Current key as not reached the watermark. Just return.
269                                Ordering::Less | Ordering::Equal => false,
270                                // Current key will be filtered by the watermark.
271                                // Return true to further advance the key.
272                                Ordering::Greater => true,
273                            };
274                        }
275                    }
276                }
277                Ordering::Greater => {
278                    return false;
279                }
280            }
281        }
282        false
283    }
284}
285
286impl PkPrefixSkipWatermarkState {
287    pub fn new(watermarks: BTreeMap<TableId, ReadTableWatermark>) -> Self {
288        Self {
289            remain_watermarks: VecDeque::new(),
290            watermarks,
291        }
292    }
293
294    pub fn from_safe_epoch_watermarks(
295        safe_epoch_watermarks: BTreeMap<u32, TableWatermarks>,
296    ) -> Self {
297        let watermarks = safe_epoch_read_table_watermarks_impl(safe_epoch_watermarks);
298        Self::new(watermarks)
299    }
300}
301
302pub struct NonPkPrefixSkipWatermarkState {
303    watermarks: BTreeMap<TableId, ReadTableWatermark>,
304    remain_watermarks: VecDeque<(TableId, VirtualNode, WatermarkDirection, Datum)>,
305    compaction_catalog_agent_ref: CompactionCatalogAgentRef,
306
307    last_serde: Option<(OrderedRowSerde, OrderedRowSerde, usize)>,
308    last_table_id: Option<u32>,
309}
310
311impl NonPkPrefixSkipWatermarkState {
312    pub fn new(
313        watermarks: BTreeMap<TableId, ReadTableWatermark>,
314        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
315    ) -> Self {
316        Self {
317            remain_watermarks: VecDeque::new(),
318            watermarks,
319            compaction_catalog_agent_ref,
320            last_serde: None,
321            last_table_id: None,
322        }
323    }
324
325    pub fn from_safe_epoch_watermarks(
326        safe_epoch_watermarks: BTreeMap<u32, TableWatermarks>,
327        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
328    ) -> Self {
329        let watermarks = safe_epoch_read_table_watermarks_impl(safe_epoch_watermarks);
330        Self::new(watermarks, compaction_catalog_agent_ref)
331    }
332}
333
334impl SkipWatermarkState for NonPkPrefixSkipWatermarkState {
335    #[inline(always)]
336    fn has_watermark(&self) -> bool {
337        !self.remain_watermarks.is_empty()
338    }
339
340    fn should_delete(&mut self, key: &FullKey<&[u8]>) -> bool {
341        if let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
342            let key_table_id = key.user_key.table_id;
343            {
344                if self
345                    .last_table_id
346                    .is_none_or(|last_table_id| last_table_id != key_table_id.table_id())
347                {
348                    self.last_table_id = Some(key_table_id.table_id());
349                    self.last_serde = self
350                        .compaction_catalog_agent_ref
351                        .watermark_serde(table_id.table_id());
352                }
353            }
354
355            let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
356            match (&key_table_id, &key_vnode).cmp(&(table_id, vnode)) {
357                Ordering::Less => {
358                    return false;
359                }
360                Ordering::Equal => {
361                    let (pk_prefix_serde, watermark_col_serde, watermark_col_idx_in_pk) =
362                        self.last_serde.as_ref().unwrap();
363                    let row = pk_prefix_serde
364                        .deserialize(inner_key)
365                        .unwrap_or_else(|_| {
366                            panic!("Failed to deserialize pk_prefix inner_key {:?} serde data_types {:?} order_types {:?}", inner_key, pk_prefix_serde.get_data_types(), pk_prefix_serde.get_order_types());
367                        });
368                    let watermark_col_in_pk = row.datum_at(*watermark_col_idx_in_pk);
369                    return direction.datum_filter_by_watermark(
370                        watermark_col_in_pk,
371                        watermark,
372                        watermark_col_serde.get_order_types()[0],
373                    );
374                }
375                Ordering::Greater => {
376                    // The current key has advanced over the watermark.
377                    // We may advance the watermark before advancing the key.
378                    return self.advance_watermark(key);
379                }
380            }
381        }
382        false
383    }
384
385    fn reset_watermark(&mut self) {
386        self.remain_watermarks = self
387            .watermarks
388            .iter()
389            .flat_map(|(table_id, read_watermarks)| {
390                let watermark_serde = self.compaction_catalog_agent_ref.watermark_serde(table_id.table_id()).map(|(_pk_serde, watermark_serde, _watermark_col_idx_in_pk)| watermark_serde);
391
392                read_watermarks
393                    .vnode_watermarks
394                    .iter()
395                    .map(move |(vnode, watermarks)| {
396                        (
397                            *table_id,
398                            *vnode,
399                            read_watermarks.direction,
400                            {
401                                let watermark_serde = watermark_serde.as_ref().unwrap();
402                                let row = watermark_serde
403                                .deserialize(watermarks).unwrap_or_else(|_| {
404                                    panic!("Failed to deserialize watermark {:?} serde data_types {:?} order_types {:?}", watermarks, watermark_serde.get_data_types(), watermark_serde.get_order_types());
405                                });
406                                row[0].clone()
407                            },
408                        )
409                    })
410            })
411            .collect();
412    }
413
414    fn advance_watermark(&mut self, key: &FullKey<&[u8]>) -> bool {
415        let key_table_id = key.user_key.table_id;
416        let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
417        while let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() {
418            match (table_id, vnode).cmp(&(&key_table_id, &key_vnode)) {
419                Ordering::Less => {
420                    self.remain_watermarks.pop_front();
421                    continue;
422                }
423                Ordering::Equal => {
424                    let (pk_prefix_serde, watermark_col_serde, watermark_col_idx_in_pk) =
425                        self.last_serde.as_ref().unwrap();
426
427                    let row = pk_prefix_serde
428                        .deserialize(inner_key)
429                        .unwrap_or_else(|_| {
430                            panic!("Failed to deserialize pk_prefix inner_key {:?} serde data_types {:?} order_types {:?}", inner_key, pk_prefix_serde.get_data_types(), pk_prefix_serde.get_order_types());
431                        });
432                    let watermark_col_in_pk = row.datum_at(*watermark_col_idx_in_pk);
433
434                    return direction.datum_filter_by_watermark(
435                        watermark_col_in_pk,
436                        watermark,
437                        watermark_col_serde.get_order_types()[0],
438                    );
439                }
440                Ordering::Greater => {
441                    return false;
442                }
443            }
444        }
445        false
446    }
447}
448
449pub type PkPrefixSkipWatermarkIterator<I> = SkipWatermarkIterator<I, PkPrefixSkipWatermarkState>;
450
451pub type NonPkPrefixSkipWatermarkIterator<I> =
452    SkipWatermarkIterator<I, NonPkPrefixSkipWatermarkState>;
453
454#[cfg(test)]
455mod tests {
456    use std::collections::{BTreeMap, HashMap};
457    use std::iter::{empty, once};
458    use std::sync::Arc;
459
460    use bytes::Bytes;
461    use itertools::Itertools;
462    use risingwave_common::catalog::TableId;
463    use risingwave_common::hash::VirtualNode;
464    use risingwave_common::row::{OwnedRow, RowExt};
465    use risingwave_common::types::{DataType, ScalarImpl};
466    use risingwave_common::util::epoch::test_epoch;
467    use risingwave_common::util::row_serde::OrderedRowSerde;
468    use risingwave_common::util::sort_util::OrderType;
469    use risingwave_hummock_sdk::EpochWithGap;
470    use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey, gen_key_from_str};
471    use risingwave_hummock_sdk::table_watermark::{ReadTableWatermark, WatermarkDirection};
472
473    use super::PkPrefixSkipWatermarkState;
474    use crate::compaction_catalog_manager::{
475        CompactionCatalogAgent, FilterKeyExtractorImpl, FullKeyFilterKeyExtractor,
476    };
477    use crate::hummock::iterator::{
478        HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
479        NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator,
480    };
481    use crate::hummock::shared_buffer::shared_buffer_batch::{
482        SharedBufferBatch, SharedBufferValue,
483    };
484    use crate::row_serde::row_serde_util::{serialize_pk, serialize_pk_with_vnode};
485
486    const EPOCH: u64 = test_epoch(1);
487    const TABLE_ID: TableId = TableId::new(233);
488
489    async fn assert_iter_eq(
490        mut first: Option<impl HummockIterator>,
491        mut second: impl HummockIterator,
492        seek_key: Option<(usize, usize)>,
493    ) {
494        if let Some((vnode, key_index)) = seek_key {
495            let (seek_key, _) = gen_key_value(vnode, key_index);
496            let full_key = FullKey {
497                user_key: UserKey {
498                    table_id: TABLE_ID,
499                    table_key: seek_key,
500                },
501                epoch_with_gap: EpochWithGap::new_from_epoch(EPOCH),
502            };
503            if let Some(first) = &mut first {
504                first.seek(full_key.to_ref()).await.unwrap();
505            }
506            second.seek(full_key.to_ref()).await.unwrap()
507        } else {
508            if let Some(first) = &mut first {
509                first.rewind().await.unwrap();
510            }
511            second.rewind().await.unwrap();
512        }
513
514        if let Some(first) = &mut first {
515            while first.is_valid() {
516                assert!(second.is_valid());
517                let first_key = first.key();
518                let second_key = second.key();
519                assert_eq!(first_key, second_key);
520                assert_eq!(first.value(), second.value());
521                first.next().await.unwrap();
522                second.next().await.unwrap();
523            }
524        }
525        assert!(!second.is_valid());
526    }
527
528    fn build_batch(
529        pairs: impl Iterator<Item = (TableKey<Bytes>, SharedBufferValue<Bytes>)>,
530        table_id: TableId,
531    ) -> Option<SharedBufferBatch> {
532        let pairs: Vec<_> = pairs.collect();
533        if pairs.is_empty() {
534            None
535        } else {
536            Some(SharedBufferBatch::for_test(pairs, EPOCH, table_id))
537        }
538    }
539
540    fn filter_with_watermarks(
541        iter: impl Iterator<Item = (TableKey<Bytes>, SharedBufferValue<Bytes>)>,
542        table_watermarks: ReadTableWatermark,
543    ) -> impl Iterator<Item = (TableKey<Bytes>, SharedBufferValue<Bytes>)> {
544        iter.filter(move |(key, _)| {
545            if let Some(watermark) = table_watermarks.vnode_watermarks.get(&key.vnode_part()) {
546                !table_watermarks
547                    .direction
548                    .key_filter_by_watermark(key.key_part(), watermark)
549            } else {
550                true
551            }
552        })
553    }
554
555    fn gen_inner_key(index: usize) -> String {
556        format!("key{:5}", index)
557    }
558
559    async fn test_watermark(
560        watermarks: impl IntoIterator<Item = (usize, usize)>,
561        direction: WatermarkDirection,
562    ) {
563        let test_index: [(usize, usize); 7] =
564            [(0, 2), (0, 3), (0, 4), (1, 1), (1, 3), (4, 2), (8, 1)];
565        let items = test_index
566            .iter()
567            .map(|(vnode, key_index)| gen_key_value(*vnode, *key_index))
568            .collect_vec();
569
570        let read_watermark = ReadTableWatermark {
571            direction,
572            vnode_watermarks: BTreeMap::from_iter(watermarks.into_iter().map(
573                |(vnode, key_index)| {
574                    (
575                        VirtualNode::from_index(vnode),
576                        Bytes::from(gen_inner_key(key_index)),
577                    )
578                },
579            )),
580        };
581
582        let gen_iters = || {
583            let batch = build_batch(
584                filter_with_watermarks(items.clone().into_iter(), read_watermark.clone()),
585                TABLE_ID,
586            );
587
588            let iter = PkPrefixSkipWatermarkIterator::new(
589                build_batch(items.clone().into_iter(), TABLE_ID)
590                    .unwrap()
591                    .into_forward_iter(),
592                PkPrefixSkipWatermarkState::new(BTreeMap::from_iter(once((
593                    TABLE_ID,
594                    read_watermark.clone(),
595                )))),
596            );
597            (batch.map(|batch| batch.into_forward_iter()), iter)
598        };
599        let (first, second) = gen_iters();
600        assert_iter_eq(first, second, None).await;
601        for (vnode, key_index) in &test_index {
602            let (first, second) = gen_iters();
603            assert_iter_eq(first, second, Some((*vnode, *key_index))).await;
604        }
605        let (last_vnode, last_key_index) = test_index.last().unwrap();
606        let (first, second) = gen_iters();
607        assert_iter_eq(first, second, Some((*last_vnode, last_key_index + 1))).await;
608    }
609
610    fn gen_key_value(vnode: usize, index: usize) -> (TableKey<Bytes>, SharedBufferValue<Bytes>) {
611        (
612            gen_key_from_str(VirtualNode::from_index(vnode), &gen_inner_key(index)),
613            SharedBufferValue::Insert(Bytes::copy_from_slice(
614                format!("{}-value-{}", vnode, index).as_bytes(),
615            )),
616        )
617    }
618
619    #[tokio::test]
620    async fn test_no_watermark() {
621        test_watermark(empty(), WatermarkDirection::Ascending).await;
622        test_watermark(empty(), WatermarkDirection::Descending).await;
623    }
624
625    #[tokio::test]
626    async fn test_too_low_watermark() {
627        test_watermark(vec![(0, 0)], WatermarkDirection::Ascending).await;
628        test_watermark(vec![(0, 10)], WatermarkDirection::Descending).await;
629    }
630
631    #[tokio::test]
632    async fn test_single_watermark() {
633        test_watermark(vec![(0, 3)], WatermarkDirection::Ascending).await;
634        test_watermark(vec![(0, 3)], WatermarkDirection::Descending).await;
635    }
636
637    #[tokio::test]
638    async fn test_watermark_vnode_no_data() {
639        test_watermark(vec![(3, 3)], WatermarkDirection::Ascending).await;
640        test_watermark(vec![(3, 3)], WatermarkDirection::Descending).await;
641    }
642
643    #[tokio::test]
644    async fn test_filter_all() {
645        test_watermark(
646            vec![(0, 5), (1, 4), (2, 0), (4, 3), (8, 2)],
647            WatermarkDirection::Ascending,
648        )
649        .await;
650        test_watermark(
651            vec![(0, 0), (1, 0), (2, 0), (4, 0), (8, 0)],
652            WatermarkDirection::Descending,
653        )
654        .await;
655    }
656
657    #[tokio::test]
658    async fn test_advance_multi_vnode() {
659        test_watermark(vec![(1, 2), (8, 0)], WatermarkDirection::Ascending).await;
660    }
661
662    #[tokio::test]
663    async fn test_non_pk_prefix_watermark() {
664        fn gen_key_value(
665            vnode: usize,
666            col_0: i32,
667            col_1: i32,
668            col_2: i32,
669            col_3: i32,
670            pk_serde: &OrderedRowSerde,
671            pk_indices: &[usize],
672        ) -> (TableKey<Bytes>, SharedBufferValue<Bytes>) {
673            let r = OwnedRow::new(vec![
674                Some(ScalarImpl::Int32(col_0)),
675                Some(ScalarImpl::Int32(col_1)),
676                Some(ScalarImpl::Int32(col_2)), // watermark column
677                Some(ScalarImpl::Int32(col_3)),
678            ]);
679
680            let pk = r.project(pk_indices);
681
682            let k1 = serialize_pk_with_vnode(pk, pk_serde, VirtualNode::from_index(vnode));
683            let v1 = SharedBufferValue::Insert(Bytes::copy_from_slice(
684                format!("{}-value-{}", vnode, col_2).as_bytes(),
685            ));
686            (k1, v1)
687        }
688
689        let watermark_direction = WatermarkDirection::Ascending;
690
691        let watermark_col_serde =
692            OrderedRowSerde::new(vec![DataType::Int32], vec![OrderType::ascending()]);
693        let pk_serde = OrderedRowSerde::new(
694            vec![DataType::Int32, DataType::Int32, DataType::Int32],
695            vec![
696                OrderType::ascending(),
697                OrderType::ascending(),
698                OrderType::ascending(),
699            ],
700        );
701
702        let pk_indices = vec![0, 2, 3];
703        let watermark_col_idx_in_pk = 1;
704
705        {
706            // test single vnode
707            let shared_buffer_batch = {
708                let mut kv_pairs = (0..10)
709                    .map(|i| gen_key_value(0, i, 0, i, i, &pk_serde, &pk_indices))
710                    .collect_vec();
711                kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
712                build_batch(kv_pairs.into_iter(), TABLE_ID)
713            }
714            .unwrap();
715
716            {
717                // empty read watermark
718                let read_watermark = ReadTableWatermark {
719                    direction: watermark_direction,
720                    vnode_watermarks: BTreeMap::default(),
721                };
722
723                let compaction_catalog_agent_ref =
724                    CompactionCatalogAgent::for_test(vec![TABLE_ID.into()]);
725
726                let mut iter = NonPkPrefixSkipWatermarkIterator::new(
727                    shared_buffer_batch.clone().into_forward_iter(),
728                    NonPkPrefixSkipWatermarkState::new(
729                        BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
730                        compaction_catalog_agent_ref,
731                    ),
732                );
733
734                iter.rewind().await.unwrap();
735                assert!(iter.is_valid());
736                for i in 0..10 {
737                    let (k, _v) = gen_key_value(0, i, 0, i, i, &pk_serde, &pk_indices);
738                    assert_eq!(iter.key().user_key.table_key.as_ref(), k.as_ref());
739                    iter.next().await.unwrap();
740                }
741                assert!(!iter.is_valid());
742            }
743
744            {
745                // test watermark
746                let watermark = {
747                    let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
748                    serialize_pk(r1, &watermark_col_serde)
749                };
750
751                let read_watermark = ReadTableWatermark {
752                    direction: watermark_direction,
753                    vnode_watermarks: BTreeMap::from_iter(once((
754                        VirtualNode::from_index(0),
755                        watermark.clone(),
756                    ))),
757                };
758
759                let full_key_filter_key_extractor =
760                    FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
761
762                let table_id_to_vnode =
763                    HashMap::from_iter(once((TABLE_ID.table_id(), VirtualNode::COUNT_FOR_TEST)));
764
765                let table_id_to_watermark_serde = HashMap::from_iter(once((
766                    TABLE_ID.table_id(),
767                    Some((
768                        pk_serde.clone(),
769                        watermark_col_serde.clone(),
770                        watermark_col_idx_in_pk,
771                    )),
772                )));
773
774                let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
775                    full_key_filter_key_extractor,
776                    table_id_to_vnode,
777                    table_id_to_watermark_serde,
778                ));
779
780                let mut iter = NonPkPrefixSkipWatermarkIterator::new(
781                    shared_buffer_batch.clone().into_forward_iter(),
782                    NonPkPrefixSkipWatermarkState::new(
783                        BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
784                        compaction_catalog_agent_ref,
785                    ),
786                );
787
788                iter.rewind().await.unwrap();
789                assert!(iter.is_valid());
790                for i in 5..10 {
791                    let (k, _v) = gen_key_value(0, i, 0, i, i, &pk_serde, &pk_indices);
792                    assert_eq!(iter.key().user_key.table_key.as_ref(), k.as_ref());
793                    iter.next().await.unwrap();
794                }
795                assert!(!iter.is_valid());
796            }
797        }
798
799        {
800            // test multi vnode
801            let shared_buffer_batch = {
802                let mut kv_pairs = (0..10_i32)
803                    .map(|i| gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices))
804                    .collect_vec();
805
806                kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
807                build_batch(kv_pairs.into_iter(), TABLE_ID)
808            };
809
810            {
811                // test watermark
812                let watermark = {
813                    let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
814                    serialize_pk(r1, &watermark_col_serde)
815                };
816
817                let read_watermark = ReadTableWatermark {
818                    direction: watermark_direction,
819                    vnode_watermarks: BTreeMap::from_iter(
820                        (0..2).map(|i| (VirtualNode::from_index(i), watermark.clone())),
821                    ),
822                };
823
824                let full_key_filter_key_extractor =
825                    FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
826
827                let table_id_to_vnode =
828                    HashMap::from_iter(once((TABLE_ID.table_id(), VirtualNode::COUNT_FOR_TEST)));
829
830                let table_id_to_watermark_serde = HashMap::from_iter(once((
831                    TABLE_ID.table_id(),
832                    Some((
833                        pk_serde.clone(),
834                        watermark_col_serde.clone(),
835                        watermark_col_idx_in_pk,
836                    )),
837                )));
838
839                let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
840                    full_key_filter_key_extractor,
841                    table_id_to_vnode,
842                    table_id_to_watermark_serde,
843                ));
844
845                let mut iter = NonPkPrefixSkipWatermarkIterator::new(
846                    shared_buffer_batch.clone().unwrap().into_forward_iter(),
847                    NonPkPrefixSkipWatermarkState::new(
848                        BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
849                        compaction_catalog_agent_ref,
850                    ),
851                );
852
853                iter.rewind().await.unwrap();
854                assert!(iter.is_valid());
855                let mut kv_pairs = (5..10_i32)
856                    .map(|i| {
857                        let (k, v) =
858                            gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices);
859                        (k, v)
860                    })
861                    .collect_vec();
862                kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
863                let mut index = 0;
864                while iter.is_valid() {
865                    assert!(kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
866                    iter.next().await.unwrap();
867                    index += 1;
868                }
869            }
870
871            {
872                // test watermark
873                let watermark = {
874                    let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
875                    serialize_pk(r1, &watermark_col_serde)
876                };
877
878                let read_watermark = ReadTableWatermark {
879                    direction: watermark_direction,
880                    vnode_watermarks: BTreeMap::from_iter(
881                        (0..2).map(|i| (VirtualNode::from_index(i), watermark.clone())),
882                    ),
883                };
884
885                let full_key_filter_key_extractor =
886                    FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
887
888                let table_id_to_vnode =
889                    HashMap::from_iter(once((TABLE_ID.table_id(), VirtualNode::COUNT_FOR_TEST)));
890
891                let table_id_to_watermark_serde = HashMap::from_iter(once((
892                    TABLE_ID.table_id(),
893                    Some((
894                        pk_serde.clone(),
895                        watermark_col_serde.clone(),
896                        watermark_col_idx_in_pk,
897                    )),
898                )));
899
900                let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
901                    full_key_filter_key_extractor,
902                    table_id_to_vnode,
903                    table_id_to_watermark_serde,
904                ));
905
906                let mut iter = NonPkPrefixSkipWatermarkIterator::new(
907                    shared_buffer_batch.clone().unwrap().into_forward_iter(),
908                    NonPkPrefixSkipWatermarkState::new(
909                        BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
910                        compaction_catalog_agent_ref,
911                    ),
912                );
913
914                iter.rewind().await.unwrap();
915                assert!(iter.is_valid());
916                let mut kv_pairs = (5..10_i32)
917                    .map(|i| {
918                        let (k, v) =
919                            gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices);
920                        (k, v)
921                    })
922                    .collect_vec();
923                kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
924                let mut index = 0;
925                while iter.is_valid() {
926                    assert!(kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
927                    iter.next().await.unwrap();
928                    index += 1;
929                }
930            }
931
932            {
933                // test watermark Descending
934                let watermark = {
935                    let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
936                    serialize_pk(r1, &watermark_col_serde)
937                };
938
939                let read_watermark = ReadTableWatermark {
940                    direction: WatermarkDirection::Descending,
941                    vnode_watermarks: BTreeMap::from_iter(
942                        (0..2).map(|i| (VirtualNode::from_index(i), watermark.clone())),
943                    ),
944                };
945
946                let full_key_filter_key_extractor =
947                    FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
948
949                let table_id_to_vnode =
950                    HashMap::from_iter(once((TABLE_ID.table_id(), VirtualNode::COUNT_FOR_TEST)));
951
952                let table_id_to_watermark_serde = HashMap::from_iter(once((
953                    TABLE_ID.table_id(),
954                    Some((
955                        pk_serde.clone(),
956                        watermark_col_serde.clone(),
957                        watermark_col_idx_in_pk,
958                    )),
959                )));
960
961                let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
962                    full_key_filter_key_extractor,
963                    table_id_to_vnode,
964                    table_id_to_watermark_serde,
965                ));
966
967                let mut iter = NonPkPrefixSkipWatermarkIterator::new(
968                    shared_buffer_batch.clone().unwrap().into_forward_iter(),
969                    NonPkPrefixSkipWatermarkState::new(
970                        BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))),
971                        compaction_catalog_agent_ref,
972                    ),
973                );
974
975                iter.rewind().await.unwrap();
976                assert!(iter.is_valid());
977                let mut kv_pairs = (0..=5_i32)
978                    .map(|i| {
979                        let (k, v) =
980                            gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices);
981                        (k, v)
982                    })
983                    .collect_vec();
984                kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
985                let mut index = 0;
986                while iter.is_valid() {
987                    assert!(kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
988                    iter.next().await.unwrap();
989                    index += 1;
990                }
991            }
992        }
993    }
994
995    #[tokio::test]
996    async fn test_mix_watermark() {
997        fn gen_key_value(
998            vnode: usize,
999            col_0: i32,
1000            col_1: i32,
1001            col_2: i32,
1002            col_3: i32,
1003            pk_serde: &OrderedRowSerde,
1004            pk_indices: &[usize],
1005        ) -> (TableKey<Bytes>, SharedBufferValue<Bytes>) {
1006            let r = OwnedRow::new(vec![
1007                Some(ScalarImpl::Int32(col_0)),
1008                Some(ScalarImpl::Int32(col_1)),
1009                Some(ScalarImpl::Int32(col_2)), // watermark column
1010                Some(ScalarImpl::Int32(col_3)),
1011            ]);
1012
1013            let pk = r.project(pk_indices);
1014
1015            let k1 = serialize_pk_with_vnode(pk, pk_serde, VirtualNode::from_index(vnode));
1016            let v1 = SharedBufferValue::Insert(Bytes::copy_from_slice(
1017                format!("{}-value-{}-{}-{}-{}", vnode, col_0, col_1, col_2, col_3).as_bytes(),
1018            ));
1019
1020            (k1, v1)
1021        }
1022
1023        let watermark_col_serde =
1024            OrderedRowSerde::new(vec![DataType::Int32], vec![OrderType::ascending()]);
1025        let t1_pk_serde = OrderedRowSerde::new(
1026            vec![DataType::Int32, DataType::Int32, DataType::Int32],
1027            vec![
1028                OrderType::ascending(),
1029                OrderType::ascending(),
1030                OrderType::ascending(),
1031            ],
1032        );
1033
1034        let t1_pk_indices = vec![0, 2, 3];
1035        let t1_watermark_col_idx_in_pk = 1;
1036
1037        let t2_pk_indices = vec![0, 1, 2];
1038
1039        let t2_pk_serde = OrderedRowSerde::new(
1040            vec![DataType::Int32, DataType::Int32, DataType::Int32],
1041            vec![
1042                OrderType::ascending(),
1043                OrderType::ascending(),
1044                OrderType::ascending(),
1045            ],
1046        );
1047
1048        let t1_id = TABLE_ID;
1049        let t2_id = TableId::from(t1_id.table_id() + 1);
1050
1051        let t1_shared_buffer_batch = {
1052            let mut kv_pairs = (0..10_i32)
1053                .map(|i| {
1054                    gen_key_value(
1055                        i as usize % 2,
1056                        10 - i,
1057                        0,
1058                        i,
1059                        i,
1060                        &t1_pk_serde,
1061                        &t1_pk_indices,
1062                    )
1063                })
1064                .collect_vec();
1065
1066            kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1067            build_batch(kv_pairs.into_iter(), t1_id).unwrap()
1068        };
1069
1070        let t2_shared_buffer_batch = {
1071            let mut kv_pairs = (0..10_i32)
1072                .map(|i| {
1073                    gen_key_value(
1074                        i as usize % 2,
1075                        10 - i,
1076                        0,
1077                        0,
1078                        0,
1079                        &t2_pk_serde,
1080                        &t2_pk_indices,
1081                    )
1082                })
1083                .collect_vec();
1084
1085            kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1086            build_batch(kv_pairs.into_iter(), t2_id).unwrap()
1087        };
1088
1089        let t1_watermark = {
1090            let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
1091            serialize_pk(r1, &watermark_col_serde)
1092        };
1093
1094        let t1_read_watermark = ReadTableWatermark {
1095            direction: WatermarkDirection::Ascending,
1096            vnode_watermarks: BTreeMap::from_iter(
1097                (0..2).map(|i| (VirtualNode::from_index(i), t1_watermark.clone())),
1098            ),
1099        };
1100
1101        let t2_watermark = {
1102            let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]);
1103            serialize_pk(r1, &watermark_col_serde)
1104        };
1105
1106        let t2_read_watermark = ReadTableWatermark {
1107            direction: WatermarkDirection::Ascending,
1108            vnode_watermarks: BTreeMap::from_iter(
1109                (0..2).map(|i| (VirtualNode::from_index(i), t2_watermark.clone())),
1110            ),
1111        };
1112
1113        {
1114            // test non pk prefix watermark
1115            let t1_iter = t1_shared_buffer_batch.clone().into_forward_iter();
1116            let t2_iter = t2_shared_buffer_batch.clone().into_forward_iter();
1117            let iter_vec = vec![t1_iter, t2_iter];
1118            let merge_iter = MergeIterator::new(iter_vec);
1119
1120            let full_key_filter_key_extractor =
1121                FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
1122
1123            let table_id_to_vnode =
1124                HashMap::from_iter(once((TABLE_ID.table_id(), VirtualNode::COUNT_FOR_TEST)));
1125
1126            let table_id_to_watermark_serde = HashMap::from_iter(once((
1127                t1_id.table_id(),
1128                Some((
1129                    t1_pk_serde.clone(),
1130                    watermark_col_serde.clone(),
1131                    t1_watermark_col_idx_in_pk,
1132                )),
1133            )));
1134
1135            let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1136                full_key_filter_key_extractor,
1137                table_id_to_vnode,
1138                table_id_to_watermark_serde,
1139            ));
1140
1141            let mut iter = NonPkPrefixSkipWatermarkIterator::new(
1142                merge_iter,
1143                NonPkPrefixSkipWatermarkState::new(
1144                    BTreeMap::from_iter(once((TABLE_ID, t1_read_watermark.clone()))),
1145                    compaction_catalog_agent_ref,
1146                ),
1147            );
1148
1149            iter.rewind().await.unwrap();
1150            assert!(iter.is_valid());
1151            let mut t1_kv_pairs = (5..10_i32)
1152                .map(|i| {
1153                    let (k, v) = gen_key_value(
1154                        i as usize % 2,
1155                        10 - i,
1156                        0,
1157                        i,
1158                        i,
1159                        &t1_pk_serde,
1160                        &t1_pk_indices,
1161                    );
1162                    (k, v)
1163                })
1164                .collect_vec();
1165
1166            t1_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1167
1168            let mut t2_kv_pairs = (0..10_i32)
1169                .map(|i| {
1170                    gen_key_value(
1171                        i as usize % 2,
1172                        10 - i,
1173                        0,
1174                        0,
1175                        0,
1176                        &t2_pk_serde,
1177                        &t2_pk_indices,
1178                    )
1179                })
1180                .collect_vec();
1181
1182            t2_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1183            let mut index = 0;
1184            for _ in 0..t1_kv_pairs.len() {
1185                assert!(t1_kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
1186                iter.next().await.unwrap();
1187                index += 1;
1188            }
1189
1190            assert!(iter.is_valid());
1191            assert_eq!(t1_kv_pairs.len(), index);
1192
1193            index = 0;
1194            for _ in 0..t2_kv_pairs.len() {
1195                assert!(t2_kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref());
1196                iter.next().await.unwrap();
1197                index += 1;
1198            }
1199
1200            assert!(!iter.is_valid());
1201            assert_eq!(t2_kv_pairs.len(), index);
1202        }
1203
1204        {
1205            let t1_iter = t1_shared_buffer_batch.clone().into_forward_iter();
1206            let t2_iter = t2_shared_buffer_batch.clone().into_forward_iter();
1207            let iter_vec = vec![t1_iter, t2_iter];
1208            let merge_iter = MergeIterator::new(iter_vec);
1209
1210            let full_key_filter_key_extractor =
1211                FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
1212
1213            let table_id_to_vnode = HashMap::from_iter(
1214                vec![
1215                    (t1_id.table_id(), VirtualNode::COUNT_FOR_TEST),
1216                    (t2_id.table_id(), VirtualNode::COUNT_FOR_TEST),
1217                ]
1218                .into_iter(),
1219            );
1220
1221            let table_id_to_watermark_serde = HashMap::from_iter(
1222                vec![
1223                    (
1224                        t1_id.table_id(),
1225                        Some((
1226                            t1_pk_serde.clone(),
1227                            watermark_col_serde.clone(),
1228                            t1_watermark_col_idx_in_pk,
1229                        )),
1230                    ),
1231                    (t2_id.table_id(), None),
1232                ]
1233                .into_iter(),
1234            );
1235
1236            let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1237                full_key_filter_key_extractor,
1238                table_id_to_vnode,
1239                table_id_to_watermark_serde,
1240            ));
1241
1242            let non_pk_prefix_iter = NonPkPrefixSkipWatermarkIterator::new(
1243                merge_iter,
1244                NonPkPrefixSkipWatermarkState::new(
1245                    BTreeMap::from_iter(once((t1_id, t1_read_watermark.clone()))),
1246                    compaction_catalog_agent_ref.clone(),
1247                ),
1248            );
1249
1250            let mut mix_iter = PkPrefixSkipWatermarkIterator::new(
1251                non_pk_prefix_iter,
1252                PkPrefixSkipWatermarkState::new(BTreeMap::from_iter(once((
1253                    t2_id,
1254                    t2_read_watermark.clone(),
1255                )))),
1256            );
1257
1258            mix_iter.rewind().await.unwrap();
1259            assert!(mix_iter.is_valid());
1260
1261            let mut t1_kv_pairs = (5..10_i32)
1262                .map(|i| {
1263                    let (k, v) = gen_key_value(
1264                        i as usize % 2,
1265                        10 - i,
1266                        0,
1267                        i,
1268                        i,
1269                        &t1_pk_serde,
1270                        &t1_pk_indices,
1271                    );
1272                    (k, v)
1273                })
1274                .collect_vec();
1275
1276            t1_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1277
1278            let mut t2_kv_pairs = (0..=5_i32)
1279                .map(|i| {
1280                    gen_key_value(
1281                        i as usize % 2,
1282                        10 - i,
1283                        0,
1284                        0,
1285                        0,
1286                        &t2_pk_serde,
1287                        &t2_pk_indices,
1288                    )
1289                })
1290                .collect_vec();
1291
1292            t2_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1293
1294            let mut index = 0;
1295            for _ in 0..t1_kv_pairs.len() {
1296                assert!(
1297                    t1_kv_pairs[index].0.as_ref() == mix_iter.key().user_key.table_key.as_ref()
1298                );
1299                mix_iter.next().await.unwrap();
1300                index += 1;
1301            }
1302
1303            assert!(mix_iter.is_valid());
1304            assert_eq!(t1_kv_pairs.len(), index);
1305
1306            index = 0;
1307
1308            for _ in 0..t2_kv_pairs.len() {
1309                assert!(
1310                    t2_kv_pairs[index].0.as_ref() == mix_iter.key().user_key.table_key.as_ref()
1311                );
1312                mix_iter.next().await.unwrap();
1313                index += 1;
1314            }
1315
1316            assert!(!mix_iter.is_valid());
1317            assert_eq!(t2_kv_pairs.len(), index);
1318        }
1319    }
1320}