risingwave_hummock_sdk/
table_watermark.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
17use std::fmt::Display;
18use std::mem::size_of;
19use std::ops::Bound::{Excluded, Included, Unbounded};
20use std::sync::Arc;
21
22use bytes::Bytes;
23use itertools::Itertools;
24use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
25use risingwave_common::catalog::TableId;
26use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
27use risingwave_common::types::ToDatumRef;
28use risingwave_common::util::sort_util::{OrderType, cmp_datum};
29use risingwave_common_estimate_size::EstimateSize;
30use risingwave_pb::hummock::table_watermarks::PbEpochNewWatermarks;
31use risingwave_pb::hummock::{
32    PbVnodeWatermark, PbWatermarkSerdeType, TableWatermarks as PbTableWatermarks,
33};
34use tracing::{debug, warn};
35
36use crate::HummockEpoch;
37use crate::key::{TableKey, TableKeyRange, prefix_slice_with_vnode, vnode};
38
39#[derive(Clone)]
40pub struct ReadTableWatermark {
41    pub direction: WatermarkDirection,
42    pub vnode_watermarks: BTreeMap<VirtualNode, Bytes>,
43}
44
45#[derive(Clone)]
46pub struct PkPrefixTableWatermarksIndex {
47    pub watermark_direction: WatermarkDirection,
48    // later epoch at the back
49    pub staging_watermarks: VecDeque<(HummockEpoch, Arc<[VnodeWatermark]>)>,
50    pub committed_watermarks: Option<Arc<TableWatermarks>>,
51    latest_epoch: HummockEpoch,
52    committed_epoch: Option<HummockEpoch>,
53}
54
55impl PkPrefixTableWatermarksIndex {
56    pub fn new(
57        watermark_direction: WatermarkDirection,
58        first_epoch: HummockEpoch,
59        first_vnode_watermark: Vec<VnodeWatermark>,
60        committed_epoch: Option<HummockEpoch>,
61    ) -> Self {
62        if let Some(committed_epoch) = committed_epoch {
63            assert!(first_epoch > committed_epoch);
64        }
65        Self {
66            watermark_direction,
67            staging_watermarks: VecDeque::from_iter([(
68                first_epoch,
69                Arc::from(first_vnode_watermark),
70            )]),
71            committed_watermarks: None,
72            latest_epoch: first_epoch,
73            committed_epoch,
74        }
75    }
76
77    pub fn new_committed(
78        committed_watermarks: Arc<TableWatermarks>,
79        committed_epoch: HummockEpoch,
80    ) -> Self {
81        assert_eq!(
82            committed_watermarks.watermark_type,
83            WatermarkSerdeType::PkPrefix
84        );
85        Self {
86            watermark_direction: committed_watermarks.direction,
87            staging_watermarks: VecDeque::new(),
88            committed_epoch: Some(committed_epoch),
89            latest_epoch: committed_epoch,
90            committed_watermarks: Some(committed_watermarks),
91        }
92    }
93
94    pub fn read_watermark(&self, vnode: VirtualNode, epoch: HummockEpoch) -> Option<Bytes> {
95        // iterate from new epoch to old epoch
96        for (watermark_epoch, vnode_watermark_list) in self.staging_watermarks.iter().rev().chain(
97            self.committed_watermarks
98                .iter()
99                .flat_map(|watermarks| watermarks.watermarks.iter().rev()),
100        ) {
101            if *watermark_epoch > epoch {
102                continue;
103            }
104            for vnode_watermark in vnode_watermark_list.as_ref() {
105                if vnode_watermark.vnode_bitmap.is_set(vnode.to_index()) {
106                    return Some(vnode_watermark.watermark.clone());
107                }
108            }
109        }
110        None
111    }
112
113    pub fn latest_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
114        self.read_watermark(vnode, HummockEpoch::MAX)
115    }
116
117    pub fn rewrite_range_with_table_watermark(
118        &self,
119        epoch: HummockEpoch,
120        key_range: &mut TableKeyRange,
121    ) {
122        let vnode = vnode(key_range);
123        if let Some(watermark) = self.read_watermark(vnode, epoch) {
124            match self.watermark_direction {
125                WatermarkDirection::Ascending => {
126                    let overwrite_start_key = match &key_range.0 {
127                        Included(start_key) | Excluded(start_key) => {
128                            start_key.key_part() < watermark
129                        }
130                        Unbounded => true,
131                    };
132                    if overwrite_start_key {
133                        let watermark_key = TableKey(prefix_slice_with_vnode(vnode, &watermark));
134                        let fully_filtered = match &key_range.1 {
135                            Included(end_key) => end_key < &watermark_key,
136                            Excluded(end_key) => end_key <= &watermark_key,
137                            Unbounded => false,
138                        };
139                        if fully_filtered {
140                            key_range.1 = Excluded(watermark_key.clone());
141                        }
142                        key_range.0 = Included(watermark_key);
143                    }
144                }
145                WatermarkDirection::Descending => {
146                    let overwrite_end_key = match &key_range.1 {
147                        Included(end_key) | Excluded(end_key) => end_key.key_part() > watermark,
148                        Unbounded => true,
149                    };
150                    if overwrite_end_key {
151                        let watermark_key = TableKey(prefix_slice_with_vnode(vnode, &watermark));
152                        let fully_filtered = match &key_range.0 {
153                            Included(start_key) => start_key > &watermark_key,
154                            Excluded(start_key) => start_key >= &watermark_key,
155                            Unbounded => false,
156                        };
157                        if fully_filtered {
158                            *key_range = (Included(watermark_key.clone()), Excluded(watermark_key));
159                        } else {
160                            key_range.1 = Included(watermark_key);
161                        }
162                    }
163                }
164            }
165        }
166    }
167
168    pub fn filter_regress_watermarks(&self, watermarks: &mut Vec<VnodeWatermark>) {
169        let mut ret = Vec::with_capacity(watermarks.len());
170        for watermark in watermarks.drain(..) {
171            let vnode_count = watermark.vnode_count();
172
173            let mut regress_vnodes = None;
174            for vnode in watermark.vnode_bitmap.iter_vnodes() {
175                if let Some(prev_watermark) = self.latest_watermark(vnode) {
176                    let is_regress = match self.direction() {
177                        WatermarkDirection::Ascending => prev_watermark > watermark.watermark,
178                        WatermarkDirection::Descending => prev_watermark < watermark.watermark,
179                    };
180                    if is_regress {
181                        warn!(
182                            "table watermark regress: {:?} {} {:?} {:?}",
183                            self.direction(),
184                            vnode,
185                            watermark.watermark,
186                            prev_watermark
187                        );
188                        regress_vnodes
189                            .get_or_insert_with(|| BitmapBuilder::zeroed(vnode_count))
190                            .set(vnode.to_index(), true);
191                    }
192                }
193            }
194            if let Some(regress_vnodes) = regress_vnodes {
195                let mut bitmap_builder = None;
196                for vnode in watermark.vnode_bitmap.iter_vnodes() {
197                    let vnode_index = vnode.to_index();
198                    if !regress_vnodes.is_set(vnode_index) {
199                        bitmap_builder
200                            .get_or_insert_with(|| BitmapBuilder::zeroed(vnode_count))
201                            .set(vnode_index, true);
202                    }
203                }
204                if let Some(bitmap_builder) = bitmap_builder {
205                    ret.push(VnodeWatermark::new(
206                        Arc::new(bitmap_builder.finish()),
207                        watermark.watermark,
208                    ));
209                }
210            } else {
211                // no vnode has regress watermark
212                ret.push(watermark);
213            }
214        }
215        *watermarks = ret;
216    }
217
218    pub fn direction(&self) -> WatermarkDirection {
219        self.watermark_direction
220    }
221
222    pub fn add_epoch_watermark(
223        &mut self,
224        epoch: HummockEpoch,
225        vnode_watermark_list: Arc<[VnodeWatermark]>,
226        direction: WatermarkDirection,
227    ) {
228        assert!(epoch > self.latest_epoch);
229        assert_eq!(self.watermark_direction, direction);
230        self.latest_epoch = epoch;
231        #[cfg(debug_assertions)]
232        if !vnode_watermark_list.is_empty() {
233            let vnode_count = vnode_watermark_list[0].vnode_count();
234            let mut vnode_is_set = BitmapBuilder::zeroed(vnode_count);
235            for vnode_watermark in vnode_watermark_list.as_ref() {
236                for vnode in vnode_watermark.vnode_bitmap.iter_ones() {
237                    assert!(!vnode_is_set.is_set(vnode));
238                    vnode_is_set.set(vnode, true);
239                    let vnode = VirtualNode::from_index(vnode);
240                    if let Some(prev_watermark) = self.latest_watermark(vnode) {
241                        match self.watermark_direction {
242                            WatermarkDirection::Ascending => {
243                                assert!(vnode_watermark.watermark >= prev_watermark);
244                            }
245                            WatermarkDirection::Descending => {
246                                assert!(vnode_watermark.watermark <= prev_watermark);
247                            }
248                        };
249                    }
250                }
251            }
252        }
253        self.staging_watermarks
254            .push_back((epoch, vnode_watermark_list));
255    }
256
257    pub fn apply_committed_watermarks(
258        &mut self,
259        committed_watermark: Arc<TableWatermarks>,
260        committed_epoch: HummockEpoch,
261    ) {
262        assert_eq!(
263            committed_watermark.watermark_type,
264            WatermarkSerdeType::PkPrefix
265        );
266        assert_eq!(self.watermark_direction, committed_watermark.direction);
267        if let Some(prev_committed_epoch) = self.committed_epoch {
268            assert!(prev_committed_epoch <= committed_epoch);
269            if prev_committed_epoch == committed_epoch {
270                return;
271            }
272        }
273        if self.latest_epoch < committed_epoch {
274            debug!(
275                latest_epoch = self.latest_epoch,
276                committed_epoch, "committed_epoch exceed table watermark latest_epoch"
277            );
278            self.latest_epoch = committed_epoch;
279        }
280        self.committed_epoch = Some(committed_epoch);
281        self.committed_watermarks = Some(committed_watermark);
282        // keep only watermark higher than committed epoch
283        while let Some((old_epoch, _)) = self.staging_watermarks.front()
284            && *old_epoch <= committed_epoch
285        {
286            let _ = self.staging_watermarks.pop_front();
287        }
288    }
289}
290
291#[derive(Debug, Clone, Copy, Eq, PartialEq)]
292pub enum WatermarkDirection {
293    Ascending,
294    Descending,
295}
296
297impl Display for WatermarkDirection {
298    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
299        match self {
300            WatermarkDirection::Ascending => write!(f, "Ascending"),
301            WatermarkDirection::Descending => write!(f, "Descending"),
302        }
303    }
304}
305
306impl WatermarkDirection {
307    pub fn key_filter_by_watermark(
308        &self,
309        key: impl AsRef<[u8]>,
310        watermark: impl AsRef<[u8]>,
311    ) -> bool {
312        let key = key.as_ref();
313        let watermark = watermark.as_ref();
314        match self {
315            WatermarkDirection::Ascending => key < watermark,
316            WatermarkDirection::Descending => key > watermark,
317        }
318    }
319
320    pub fn datum_filter_by_watermark(
321        &self,
322        watermark_col_in_pk: impl ToDatumRef,
323        watermark: impl ToDatumRef,
324        order_type: OrderType,
325    ) -> bool {
326        let watermark_col_in_pk = watermark_col_in_pk.to_datum_ref();
327        let watermark = watermark.to_datum_ref();
328        match self {
329            WatermarkDirection::Ascending => {
330                // watermark_col_in_pk < watermark
331                cmp_datum(watermark_col_in_pk, watermark, order_type).is_lt()
332            }
333            WatermarkDirection::Descending => {
334                //  watermark_col_in_pk > watermark
335                cmp_datum(watermark_col_in_pk, watermark, order_type).is_gt()
336            }
337        }
338    }
339
340    pub fn is_ascending(&self) -> bool {
341        match self {
342            WatermarkDirection::Ascending => true,
343            WatermarkDirection::Descending => false,
344        }
345    }
346}
347
348#[derive(Clone, Debug, PartialEq, EstimateSize)]
349pub struct VnodeWatermark {
350    vnode_bitmap: Arc<Bitmap>,
351    watermark: Bytes,
352}
353
354impl VnodeWatermark {
355    pub fn new(vnode_bitmap: Arc<Bitmap>, watermark: Bytes) -> Self {
356        Self {
357            vnode_bitmap,
358            watermark,
359        }
360    }
361
362    pub fn vnode_bitmap(&self) -> &Bitmap {
363        &self.vnode_bitmap
364    }
365
366    /// Vnode count derived from the bitmap.
367    pub fn vnode_count(&self) -> usize {
368        self.vnode_bitmap.len()
369    }
370
371    pub fn watermark(&self) -> &Bytes {
372        &self.watermark
373    }
374
375    pub fn to_protobuf(&self) -> PbVnodeWatermark {
376        self.into()
377    }
378}
379
380impl From<PbVnodeWatermark> for VnodeWatermark {
381    fn from(pb: PbVnodeWatermark) -> Self {
382        Self {
383            vnode_bitmap: Arc::new(Bitmap::from(pb.vnode_bitmap.as_ref().unwrap())),
384            watermark: Bytes::from(pb.watermark),
385        }
386    }
387}
388
389impl From<&PbVnodeWatermark> for VnodeWatermark {
390    fn from(pb: &PbVnodeWatermark) -> Self {
391        Self {
392            vnode_bitmap: Arc::new(Bitmap::from(pb.vnode_bitmap.as_ref().unwrap())),
393            watermark: Bytes::from(pb.watermark.clone()),
394        }
395    }
396}
397
398impl From<VnodeWatermark> for PbVnodeWatermark {
399    fn from(watermark: VnodeWatermark) -> Self {
400        Self {
401            watermark: watermark.watermark.into(),
402            vnode_bitmap: Some(watermark.vnode_bitmap.to_protobuf()),
403        }
404    }
405}
406
407impl From<&VnodeWatermark> for PbVnodeWatermark {
408    fn from(watermark: &VnodeWatermark) -> Self {
409        Self {
410            watermark: watermark.watermark.to_vec(),
411            vnode_bitmap: Some(watermark.vnode_bitmap.to_protobuf()),
412        }
413    }
414}
415
416#[derive(Clone, Debug, PartialEq)]
417pub struct TableWatermarks {
418    // later epoch at the back
419    pub watermarks: Vec<(HummockEpoch, Arc<[VnodeWatermark]>)>,
420    pub direction: WatermarkDirection,
421    pub watermark_type: WatermarkSerdeType,
422}
423
424impl TableWatermarks {
425    pub fn single_epoch(
426        epoch: HummockEpoch,
427        watermarks: Vec<VnodeWatermark>,
428        direction: WatermarkDirection,
429        watermark_type: WatermarkSerdeType,
430    ) -> Self {
431        let mut this = Self {
432            direction,
433            watermarks: Vec::new(),
434            watermark_type,
435        };
436        this.add_new_epoch_watermarks(epoch, watermarks.into(), direction, watermark_type);
437        this
438    }
439
440    pub fn add_new_epoch_watermarks(
441        &mut self,
442        epoch: HummockEpoch,
443        watermarks: Arc<[VnodeWatermark]>,
444        direction: WatermarkDirection,
445        watermark_type: WatermarkSerdeType,
446    ) {
447        assert_eq!(self.direction, direction);
448        assert_eq!(self.watermark_type, watermark_type);
449
450        if let Some((prev_epoch, _)) = self.watermarks.last() {
451            assert!(*prev_epoch < epoch);
452        }
453        if !watermarks.is_empty() {
454            let vnode_count = watermarks[0].vnode_count();
455            for watermark in &*watermarks {
456                assert_eq!(watermark.vnode_count(), vnode_count);
457            }
458            if let Some(existing_vnode_count) = self.vnode_count() {
459                assert_eq!(existing_vnode_count, vnode_count);
460            }
461        }
462        self.watermarks.push((epoch, watermarks));
463    }
464
465    /// Vnode count derived from existing watermarks. Returns `None` if there is no watermark.
466    fn vnode_count(&self) -> Option<usize> {
467        self.watermarks
468            .iter()
469            .flat_map(|(_, watermarks)| watermarks.as_ref())
470            .next()
471            .map(|w| w.vnode_count())
472    }
473}
474
475pub fn merge_multiple_new_table_watermarks(
476    table_watermarks_list: impl IntoIterator<Item = HashMap<TableId, TableWatermarks>>,
477) -> HashMap<TableId, TableWatermarks> {
478    #[allow(clippy::type_complexity)]
479    let mut ret: HashMap<
480        TableId,
481        (
482            WatermarkDirection,
483            BTreeMap<u64, Vec<VnodeWatermark>>,
484            WatermarkSerdeType,
485        ),
486    > = HashMap::new();
487    for table_watermarks in table_watermarks_list {
488        for (table_id, new_table_watermarks) in table_watermarks {
489            let epoch_watermarks = match ret.entry(table_id) {
490                Entry::Occupied(entry) => {
491                    let (direction, epoch_watermarks, watermark_type) = entry.into_mut();
492                    assert_eq!(&new_table_watermarks.direction, direction);
493                    assert_eq!(&new_table_watermarks.watermark_type, watermark_type);
494                    epoch_watermarks
495                }
496                Entry::Vacant(entry) => {
497                    let (_, epoch_watermarks, _) = entry.insert((
498                        new_table_watermarks.direction,
499                        BTreeMap::new(),
500                        new_table_watermarks.watermark_type,
501                    ));
502                    epoch_watermarks
503                }
504            };
505            for (new_epoch, new_epoch_watermarks) in new_table_watermarks.watermarks {
506                epoch_watermarks
507                    .entry(new_epoch)
508                    .or_insert_with(Vec::new)
509                    .extend(new_epoch_watermarks.iter().cloned());
510            }
511        }
512    }
513    ret.into_iter()
514        .map(
515            |(table_id, (direction, epoch_watermarks, watermark_type))| {
516                (
517                    table_id,
518                    TableWatermarks {
519                        direction,
520                        // ordered from earlier epoch to later epoch
521                        watermarks: epoch_watermarks
522                            .into_iter()
523                            .map(|(epoch, watermarks)| (epoch, Arc::from(watermarks)))
524                            .collect(),
525                        watermark_type,
526                    },
527                )
528            },
529        )
530        .collect()
531}
532
533impl TableWatermarks {
534    pub fn apply_new_table_watermarks(&mut self, newly_added_watermarks: &TableWatermarks) {
535        assert_eq!(self.direction, newly_added_watermarks.direction);
536        assert!(self.watermarks.iter().map(|(epoch, _)| epoch).is_sorted());
537        assert!(
538            newly_added_watermarks
539                .watermarks
540                .iter()
541                .map(|(epoch, _)| epoch)
542                .is_sorted()
543        );
544        // ensure that the newly added watermarks have a later epoch than the previous latest epoch.
545        if let Some((prev_last_epoch, _)) = self.watermarks.last()
546            && let Some((new_first_epoch, _)) = newly_added_watermarks.watermarks.first()
547        {
548            assert!(prev_last_epoch < new_first_epoch);
549        }
550        self.watermarks.extend(
551            newly_added_watermarks
552                .watermarks
553                .iter()
554                .map(|(epoch, new_watermarks)| (*epoch, new_watermarks.clone())),
555        );
556    }
557
558    pub fn clear_stale_epoch_watermark(&mut self, safe_epoch: u64) {
559        match self.watermarks.first() {
560            None => {
561                // return on empty watermark
562                return;
563            }
564            Some((earliest_epoch, _)) => {
565                if *earliest_epoch >= safe_epoch {
566                    // No stale epoch watermark needs to be cleared.
567                    return;
568                }
569            }
570        }
571        debug!("clear stale table watermark below epoch {}", safe_epoch);
572        let mut result_epoch_watermark = Vec::with_capacity(self.watermarks.len());
573        let mut set_vnode: HashSet<VirtualNode> = HashSet::new();
574        let mut vnode_count: Option<usize> = None; // lazy initialized on first occurrence of vnode watermark
575        while let Some((epoch, _)) = self.watermarks.last() {
576            if *epoch >= safe_epoch {
577                let (epoch, watermarks) = self.watermarks.pop().expect("have check Some");
578                for watermark in watermarks.as_ref() {
579                    vnode_count.get_or_insert_with(|| watermark.vnode_count());
580                    for vnode in watermark.vnode_bitmap.iter_vnodes() {
581                        set_vnode.insert(vnode);
582                    }
583                }
584                result_epoch_watermark.push((epoch, watermarks));
585            } else {
586                break;
587            }
588        }
589        while vnode_count != Some(set_vnode.len())
590            && let Some((_, watermarks)) = self.watermarks.pop()
591        {
592            let mut new_vnode_watermarks = Vec::new();
593            for vnode_watermark in watermarks.as_ref() {
594                let mut new_set_vnode = Vec::new();
595                vnode_count.get_or_insert_with(|| vnode_watermark.vnode_count());
596                for vnode in vnode_watermark.vnode_bitmap.iter_vnodes() {
597                    if set_vnode.insert(vnode) {
598                        new_set_vnode.push(vnode);
599                    }
600                }
601                if !new_set_vnode.is_empty() {
602                    let mut builder = BitmapBuilder::zeroed(vnode_watermark.vnode_count());
603                    for vnode in new_set_vnode {
604                        builder.set(vnode.to_index(), true);
605                    }
606                    let bitmap = Arc::new(builder.finish());
607                    new_vnode_watermarks.push(VnodeWatermark {
608                        vnode_bitmap: bitmap,
609                        watermark: vnode_watermark.watermark.clone(),
610                    })
611                }
612            }
613            if !new_vnode_watermarks.is_empty() {
614                if let Some((last_epoch, last_watermarks)) = result_epoch_watermark.last_mut()
615                    && *last_epoch == safe_epoch
616                {
617                    *last_watermarks = Arc::from(
618                        last_watermarks
619                            .iter()
620                            .cloned()
621                            .chain(new_vnode_watermarks.into_iter())
622                            .collect_vec(),
623                    );
624                } else {
625                    result_epoch_watermark.push((safe_epoch, Arc::from(new_vnode_watermarks)));
626                }
627            }
628        }
629        // epoch watermark are added from later epoch to earlier epoch.
630        // reverse to ensure that earlier epochs are at the front
631        result_epoch_watermark.reverse();
632        assert!(
633            result_epoch_watermark
634                .is_sorted_by(|(first_epoch, _), (second_epoch, _)| { first_epoch < second_epoch })
635        );
636        *self = TableWatermarks {
637            watermarks: result_epoch_watermark,
638            direction: self.direction,
639            watermark_type: self.watermark_type,
640        }
641    }
642}
643
644impl TableWatermarks {
645    pub fn estimated_encode_len(&self) -> usize {
646        self.watermarks.len() * size_of::<HummockEpoch>()
647            + self
648                .watermarks
649                .iter()
650                .map(|(_, watermarks)| {
651                    watermarks
652                        .iter()
653                        .map(|watermark| watermark.estimated_size())
654                        .sum::<usize>()
655                })
656                .sum::<usize>()
657            + size_of::<bool>() // for direction
658    }
659
660    pub fn to_protobuf(&self) -> PbTableWatermarks {
661        self.into()
662    }
663}
664
665impl From<&PbTableWatermarks> for WatermarkSerdeType {
666    fn from(pb: &PbTableWatermarks) -> Self {
667        match pb.raw_watermark_serde_type() {
668            PbWatermarkSerdeType::TypeUnspecified => {
669                // For backward compatibility.
670                #[expect(deprecated)]
671                if pb.is_non_pk_prefix {
672                    WatermarkSerdeType::NonPkPrefix
673                } else {
674                    WatermarkSerdeType::PkPrefix
675                }
676            }
677            PbWatermarkSerdeType::PkPrefix => WatermarkSerdeType::PkPrefix,
678            PbWatermarkSerdeType::NonPkPrefix => WatermarkSerdeType::NonPkPrefix,
679            PbWatermarkSerdeType::Value => WatermarkSerdeType::Value,
680        }
681    }
682}
683impl From<&PbTableWatermarks> for TableWatermarks {
684    fn from(pb: &PbTableWatermarks) -> Self {
685        let watermark_type = WatermarkSerdeType::from(pb);
686        Self {
687            watermarks: pb
688                .epoch_watermarks
689                .iter()
690                .map(|epoch_watermark| {
691                    let epoch = epoch_watermark.epoch;
692                    let watermarks = epoch_watermark
693                        .watermarks
694                        .iter()
695                        .map(VnodeWatermark::from)
696                        .collect();
697                    (epoch, watermarks)
698                })
699                .collect(),
700            direction: if pb.is_ascending {
701                WatermarkDirection::Ascending
702            } else {
703                WatermarkDirection::Descending
704            },
705            watermark_type,
706        }
707    }
708}
709
710impl From<WatermarkSerdeType> for PbWatermarkSerdeType {
711    fn from(s: WatermarkSerdeType) -> Self {
712        match s {
713            WatermarkSerdeType::PkPrefix => PbWatermarkSerdeType::PkPrefix,
714            WatermarkSerdeType::NonPkPrefix => PbWatermarkSerdeType::NonPkPrefix,
715            WatermarkSerdeType::Value => PbWatermarkSerdeType::Value,
716        }
717    }
718}
719
720impl From<&TableWatermarks> for PbTableWatermarks {
721    fn from(table_watermarks: &TableWatermarks) -> Self {
722        let is_non_pk_prefix = match table_watermarks.watermark_type {
723            WatermarkSerdeType::PkPrefix => false,
724            WatermarkSerdeType::NonPkPrefix => true,
725            WatermarkSerdeType::Value => false,
726        };
727        Self {
728            epoch_watermarks: table_watermarks
729                .watermarks
730                .iter()
731                .map(|(epoch, watermarks)| PbEpochNewWatermarks {
732                    watermarks: watermarks.iter().map(|wm| wm.into()).collect(),
733                    epoch: *epoch,
734                })
735                .collect(),
736            is_ascending: match table_watermarks.direction {
737                WatermarkDirection::Ascending => true,
738                WatermarkDirection::Descending => false,
739            },
740            #[expect(deprecated)]
741            is_non_pk_prefix,
742            raw_watermark_serde_type: PbWatermarkSerdeType::from(table_watermarks.watermark_type)
743                as i32,
744        }
745    }
746}
747
748impl From<PbTableWatermarks> for TableWatermarks {
749    fn from(pb: PbTableWatermarks) -> Self {
750        let watermark_type = WatermarkSerdeType::from(&pb);
751        Self {
752            watermarks: pb
753                .epoch_watermarks
754                .into_iter()
755                .map(|epoch_watermark| {
756                    let epoch = epoch_watermark.epoch;
757                    let watermarks = epoch_watermark
758                        .watermarks
759                        .into_iter()
760                        .map(VnodeWatermark::from)
761                        .collect();
762                    (epoch, watermarks)
763                })
764                .collect(),
765            direction: if pb.is_ascending {
766                WatermarkDirection::Ascending
767            } else {
768                WatermarkDirection::Descending
769            },
770            watermark_type,
771        }
772    }
773}
774
775impl From<TableWatermarks> for PbTableWatermarks {
776    fn from(table_watermarks: TableWatermarks) -> Self {
777        Self {
778            epoch_watermarks: table_watermarks
779                .watermarks
780                .into_iter()
781                .map(|(epoch, watermarks)| PbEpochNewWatermarks {
782                    watermarks: watermarks.iter().map(PbVnodeWatermark::from).collect(),
783                    epoch,
784                })
785                .collect(),
786            is_ascending: match table_watermarks.direction {
787                WatermarkDirection::Ascending => true,
788                WatermarkDirection::Descending => false,
789            },
790            #[expect(deprecated)]
791            is_non_pk_prefix: match table_watermarks.watermark_type {
792                WatermarkSerdeType::NonPkPrefix => true,
793                WatermarkSerdeType::PkPrefix => false,
794                WatermarkSerdeType::Value => false,
795            },
796            raw_watermark_serde_type: PbWatermarkSerdeType::from(table_watermarks.watermark_type)
797                as i32,
798        }
799    }
800}
801
802#[derive(Debug, Clone, Copy, PartialEq, Eq)]
803pub enum WatermarkSerdeType {
804    PkPrefix,
805    NonPkPrefix,
806    Value,
807}
808
809#[cfg(test)]
810mod tests {
811    use std::collections::Bound::Included;
812    use std::collections::{Bound, HashMap};
813    use std::ops::Bound::Excluded;
814    use std::sync::Arc;
815    use std::vec;
816
817    use bytes::Bytes;
818    use itertools::Itertools;
819    use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
820    use risingwave_common::catalog::TableId;
821    use risingwave_common::hash::VirtualNode;
822    use risingwave_common::util::epoch::{EpochExt, test_epoch};
823    use risingwave_pb::hummock::{PbHummockVersion, StateTableInfo};
824
825    use crate::compaction_group::StaticCompactionGroupId;
826    use crate::key::{TableKeyRange, is_empty_key_range, prefixed_range_with_vnode};
827    use crate::table_watermark::{
828        PkPrefixTableWatermarksIndex, TableWatermarks, VnodeWatermark, WatermarkDirection,
829        WatermarkSerdeType, merge_multiple_new_table_watermarks,
830    };
831    use crate::version::HummockVersion;
832
833    fn build_bitmap(vnodes: impl IntoIterator<Item = usize>) -> Arc<Bitmap> {
834        let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
835        for vnode in vnodes {
836            builder.set(vnode, true);
837        }
838        Arc::new(builder.finish())
839    }
840
841    #[test]
842    fn test_apply_new_table_watermark() {
843        let epoch1 = test_epoch(1);
844        let direction = WatermarkDirection::Ascending;
845        let watermark1 = Bytes::from("watermark1");
846        let watermark2 = Bytes::from("watermark2");
847        let watermark3 = Bytes::from("watermark3");
848        let watermark4 = Bytes::from("watermark4");
849        let watermark_type = WatermarkSerdeType::PkPrefix;
850        let mut table_watermarks = TableWatermarks::single_epoch(
851            epoch1,
852            vec![VnodeWatermark::new(build_bitmap(vec![0, 1, 2]), watermark1)],
853            direction,
854            watermark_type,
855        );
856        let epoch2 = epoch1.next_epoch();
857        table_watermarks.add_new_epoch_watermarks(
858            epoch2,
859            vec![VnodeWatermark::new(
860                build_bitmap(vec![0, 1, 2, 3]),
861                watermark2,
862            )]
863            .into(),
864            direction,
865            watermark_type,
866        );
867
868        let mut table_watermark_checkpoint = table_watermarks.clone();
869
870        let epoch3 = epoch2.next_epoch();
871        let mut second_table_watermark = TableWatermarks::single_epoch(
872            epoch3,
873            vec![VnodeWatermark::new(
874                build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
875                watermark3.clone(),
876            )],
877            direction,
878            watermark_type,
879        );
880        table_watermarks.add_new_epoch_watermarks(
881            epoch3,
882            vec![VnodeWatermark::new(
883                build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
884                watermark3,
885            )]
886            .into(),
887            direction,
888            watermark_type,
889        );
890        let epoch4 = epoch3.next_epoch();
891        let epoch5 = epoch4.next_epoch();
892        table_watermarks.add_new_epoch_watermarks(
893            epoch5,
894            vec![VnodeWatermark::new(
895                build_bitmap(vec![0, 3, 4]),
896                watermark4.clone(),
897            )]
898            .into(),
899            direction,
900            watermark_type,
901        );
902        second_table_watermark.add_new_epoch_watermarks(
903            epoch5,
904            vec![VnodeWatermark::new(build_bitmap(vec![0, 3, 4]), watermark4)].into(),
905            direction,
906            watermark_type,
907        );
908
909        table_watermark_checkpoint.apply_new_table_watermarks(&second_table_watermark);
910        assert_eq!(table_watermarks, table_watermark_checkpoint);
911    }
912
913    #[test]
914    fn test_clear_stale_epoch_watmermark() {
915        let epoch1 = test_epoch(1);
916        let direction = WatermarkDirection::Ascending;
917        let watermark1 = Bytes::from("watermark1");
918        let watermark2 = Bytes::from("watermark2");
919        let watermark3 = Bytes::from("watermark3");
920        let watermark4 = Bytes::from("watermark4");
921        let watermark_type = WatermarkSerdeType::PkPrefix;
922        let mut table_watermarks = TableWatermarks::single_epoch(
923            epoch1,
924            vec![VnodeWatermark::new(build_bitmap(vec![0, 1, 2]), watermark1)],
925            direction,
926            watermark_type,
927        );
928        let epoch2 = epoch1.next_epoch();
929        table_watermarks.add_new_epoch_watermarks(
930            epoch2,
931            vec![VnodeWatermark::new(
932                build_bitmap(vec![0, 1, 2, 3]),
933                watermark2.clone(),
934            )]
935            .into(),
936            direction,
937            watermark_type,
938        );
939        let epoch3 = epoch2.next_epoch();
940        table_watermarks.add_new_epoch_watermarks(
941            epoch3,
942            vec![VnodeWatermark::new(
943                build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
944                watermark3.clone(),
945            )]
946            .into(),
947            direction,
948            watermark_type,
949        );
950        let epoch4 = epoch3.next_epoch();
951        let epoch5 = epoch4.next_epoch();
952        table_watermarks.add_new_epoch_watermarks(
953            epoch5,
954            vec![VnodeWatermark::new(
955                build_bitmap(vec![0, 3, 4]),
956                watermark4.clone(),
957            )]
958            .into(),
959            direction,
960            watermark_type,
961        );
962
963        let mut table_watermarks_checkpoint = table_watermarks.clone();
964        table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch1);
965        assert_eq!(table_watermarks_checkpoint, table_watermarks);
966
967        table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch2);
968        assert_eq!(
969            table_watermarks_checkpoint,
970            TableWatermarks {
971                watermarks: vec![
972                    (
973                        epoch2,
974                        vec![VnodeWatermark::new(
975                            build_bitmap(vec![0, 1, 2, 3]),
976                            watermark2,
977                        )]
978                        .into()
979                    ),
980                    (
981                        epoch3,
982                        vec![VnodeWatermark::new(
983                            build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
984                            watermark3.clone(),
985                        )]
986                        .into()
987                    ),
988                    (
989                        epoch5,
990                        vec![VnodeWatermark::new(
991                            build_bitmap(vec![0, 3, 4]),
992                            watermark4.clone(),
993                        )]
994                        .into()
995                    )
996                ],
997                direction,
998                watermark_type,
999            }
1000        );
1001
1002        table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch3);
1003        assert_eq!(
1004            table_watermarks_checkpoint,
1005            TableWatermarks {
1006                watermarks: vec![
1007                    (
1008                        epoch3,
1009                        vec![VnodeWatermark::new(
1010                            build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
1011                            watermark3.clone(),
1012                        )]
1013                        .into()
1014                    ),
1015                    (
1016                        epoch5,
1017                        vec![VnodeWatermark::new(
1018                            build_bitmap(vec![0, 3, 4]),
1019                            watermark4.clone(),
1020                        )]
1021                        .into()
1022                    )
1023                ],
1024                direction,
1025                watermark_type,
1026            }
1027        );
1028
1029        table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch4);
1030        assert_eq!(
1031            table_watermarks_checkpoint,
1032            TableWatermarks {
1033                watermarks: vec![
1034                    (
1035                        epoch4,
1036                        vec![VnodeWatermark::new(
1037                            build_bitmap((1..3).chain(5..VirtualNode::COUNT_FOR_TEST)),
1038                            watermark3.clone()
1039                        )]
1040                        .into()
1041                    ),
1042                    (
1043                        epoch5,
1044                        vec![VnodeWatermark::new(
1045                            build_bitmap(vec![0, 3, 4]),
1046                            watermark4.clone(),
1047                        )]
1048                        .into()
1049                    )
1050                ],
1051                direction,
1052                watermark_type,
1053            }
1054        );
1055
1056        table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch5);
1057        assert_eq!(
1058            table_watermarks_checkpoint,
1059            TableWatermarks {
1060                watermarks: vec![(
1061                    epoch5,
1062                    vec![
1063                        VnodeWatermark::new(build_bitmap(vec![0, 3, 4]), watermark4),
1064                        VnodeWatermark::new(
1065                            build_bitmap((1..3).chain(5..VirtualNode::COUNT_FOR_TEST)),
1066                            watermark3
1067                        )
1068                    ]
1069                    .into()
1070                )],
1071                direction,
1072                watermark_type,
1073            }
1074        );
1075    }
1076
1077    #[test]
1078    fn test_merge_multiple_new_table_watermarks() {
1079        fn epoch_new_watermark(epoch: u64, bitmaps: Vec<&Bitmap>) -> (u64, Arc<[VnodeWatermark]>) {
1080            (
1081                epoch,
1082                bitmaps
1083                    .into_iter()
1084                    .map(|bitmap| VnodeWatermark {
1085                        watermark: Bytes::from(vec![1, 2, epoch as _]),
1086                        vnode_bitmap: Arc::new(bitmap.clone()),
1087                    })
1088                    .collect_vec()
1089                    .into(),
1090            )
1091        }
1092        fn build_table_watermark(
1093            vnodes: impl IntoIterator<Item = usize>,
1094            epochs: impl IntoIterator<Item = u64>,
1095        ) -> TableWatermarks {
1096            let bitmap = build_bitmap(vnodes);
1097            TableWatermarks {
1098                watermarks: epochs
1099                    .into_iter()
1100                    .map(|epoch: u64| epoch_new_watermark(epoch, vec![&bitmap]))
1101                    .collect(),
1102                direction: WatermarkDirection::Ascending,
1103                watermark_type: WatermarkSerdeType::PkPrefix,
1104            }
1105        }
1106        let table1_watermark1 = build_table_watermark(0..3, vec![1, 2, 4]);
1107        let table1_watermark2 = build_table_watermark(4..6, vec![1, 2, 5]);
1108        let table2_watermark = build_table_watermark(0..4, 1..3);
1109        let table3_watermark = build_table_watermark(0..4, 3..5);
1110        let mut first = HashMap::new();
1111        first.insert(TableId::new(1), table1_watermark1);
1112        first.insert(TableId::new(2), table2_watermark.clone());
1113        let mut second = HashMap::new();
1114        second.insert(TableId::new(1), table1_watermark2);
1115        second.insert(TableId::new(3), table3_watermark.clone());
1116        let result = merge_multiple_new_table_watermarks(vec![first, second]);
1117        let mut expected = HashMap::new();
1118        expected.insert(
1119            TableId::new(1),
1120            TableWatermarks {
1121                watermarks: vec![
1122                    epoch_new_watermark(1, vec![&build_bitmap(0..3), &build_bitmap(4..6)]),
1123                    epoch_new_watermark(2, vec![&build_bitmap(0..3), &build_bitmap(4..6)]),
1124                    epoch_new_watermark(4, vec![&build_bitmap(0..3)]),
1125                    epoch_new_watermark(5, vec![&build_bitmap(4..6)]),
1126                ],
1127                direction: WatermarkDirection::Ascending,
1128                watermark_type: WatermarkSerdeType::PkPrefix,
1129            },
1130        );
1131        expected.insert(TableId::new(2), table2_watermark);
1132        expected.insert(TableId::new(3), table3_watermark);
1133        assert_eq!(result, expected);
1134    }
1135
1136    const COMMITTED_EPOCH: u64 = test_epoch(1);
1137    const EPOCH1: u64 = test_epoch(2);
1138    const EPOCH2: u64 = test_epoch(3);
1139    const TEST_SINGLE_VNODE: VirtualNode = VirtualNode::from_index(1);
1140
1141    fn build_watermark_range(
1142        direction: WatermarkDirection,
1143        (low, high): (Bound<Bytes>, Bound<Bytes>),
1144    ) -> TableKeyRange {
1145        let range = match direction {
1146            WatermarkDirection::Ascending => (low, high),
1147            WatermarkDirection::Descending => (high, low),
1148        };
1149        prefixed_range_with_vnode(range, TEST_SINGLE_VNODE)
1150    }
1151
1152    /// Build and return a watermark index with the following watermarks
1153    /// EPOCH1 bitmap(0, 1, 2, 3) watermark1
1154    /// EPOCH2 bitmap(1, 2, 3, 4) watermark2
1155    fn build_and_test_watermark_index(
1156        direction: WatermarkDirection,
1157        watermark1: Bytes,
1158        watermark2: Bytes,
1159        watermark3: Bytes,
1160    ) -> PkPrefixTableWatermarksIndex {
1161        let mut index = PkPrefixTableWatermarksIndex::new(
1162            direction,
1163            EPOCH1,
1164            vec![VnodeWatermark::new(build_bitmap(0..4), watermark1.clone())],
1165            Some(COMMITTED_EPOCH),
1166        );
1167        index.add_epoch_watermark(
1168            EPOCH2,
1169            vec![VnodeWatermark::new(build_bitmap(1..5), watermark2.clone())].into(),
1170            direction,
1171        );
1172
1173        assert_eq!(
1174            index.read_watermark(VirtualNode::from_index(0), EPOCH1),
1175            Some(watermark1.clone())
1176        );
1177        assert_eq!(
1178            index.read_watermark(VirtualNode::from_index(1), EPOCH1),
1179            Some(watermark1.clone())
1180        );
1181        assert_eq!(
1182            index.read_watermark(VirtualNode::from_index(4), EPOCH1),
1183            None
1184        );
1185        assert_eq!(
1186            index.read_watermark(VirtualNode::from_index(0), EPOCH2),
1187            Some(watermark1.clone())
1188        );
1189        assert_eq!(
1190            index.read_watermark(VirtualNode::from_index(1), EPOCH2),
1191            Some(watermark2.clone())
1192        );
1193        assert_eq!(
1194            index.read_watermark(VirtualNode::from_index(4), EPOCH2),
1195            Some(watermark2.clone())
1196        );
1197        assert_eq!(
1198            index.latest_watermark(VirtualNode::from_index(0)),
1199            Some(watermark1.clone())
1200        );
1201        assert_eq!(
1202            index.latest_watermark(VirtualNode::from_index(1)),
1203            Some(watermark2.clone())
1204        );
1205        assert_eq!(
1206            index.latest_watermark(VirtualNode::from_index(4)),
1207            Some(watermark2.clone())
1208        );
1209
1210        // watermark is watermark2
1211        let check_watermark_range =
1212            |query_range: (Bound<Bytes>, Bound<Bytes>),
1213             output_range: Option<(Bound<Bytes>, Bound<Bytes>)>| {
1214                let mut range = build_watermark_range(direction, query_range);
1215                index.rewrite_range_with_table_watermark(EPOCH2, &mut range);
1216                if let Some(output_range) = output_range {
1217                    assert_eq!(range, build_watermark_range(direction, output_range));
1218                } else {
1219                    assert!(is_empty_key_range(&range));
1220                }
1221            };
1222
1223        // test read from single vnode and truncate begin key range
1224        check_watermark_range(
1225            (Included(watermark1.clone()), Excluded(watermark3.clone())),
1226            Some((Included(watermark2.clone()), Excluded(watermark3.clone()))),
1227        );
1228
1229        // test read from single vnode and begin key right at watermark
1230        check_watermark_range(
1231            (Included(watermark2.clone()), Excluded(watermark3.clone())),
1232            Some((Included(watermark2.clone()), Excluded(watermark3.clone()))),
1233        );
1234        check_watermark_range(
1235            (Excluded(watermark2.clone()), Excluded(watermark3.clone())),
1236            Some((Excluded(watermark2.clone()), Excluded(watermark3))),
1237        );
1238
1239        // test read from single vnode and end key right at watermark
1240        check_watermark_range(
1241            (Excluded(watermark1.clone()), Excluded(watermark2.clone())),
1242            None,
1243        );
1244        check_watermark_range(
1245            (Excluded(watermark1), Included(watermark2.clone())),
1246            Some((Included(watermark2.clone()), Included(watermark2))),
1247        );
1248
1249        index
1250    }
1251
1252    #[test]
1253    fn test_watermark_index_ascending() {
1254        let watermark1 = Bytes::from_static(b"watermark1");
1255        let watermark2 = Bytes::from_static(b"watermark2");
1256        let watermark3 = Bytes::from_static(b"watermark3");
1257        build_and_test_watermark_index(
1258            WatermarkDirection::Ascending,
1259            watermark1,
1260            watermark2,
1261            watermark3,
1262        );
1263    }
1264
1265    #[test]
1266    fn test_watermark_index_descending() {
1267        let watermark1 = Bytes::from_static(b"watermark254");
1268        let watermark2 = Bytes::from_static(b"watermark253");
1269        let watermark3 = Bytes::from_static(b"watermark252");
1270        build_and_test_watermark_index(
1271            WatermarkDirection::Descending,
1272            watermark1,
1273            watermark2,
1274            watermark3,
1275        );
1276    }
1277
1278    #[test]
1279    fn test_apply_committed_index() {
1280        let watermark1 = Bytes::from_static(b"watermark1");
1281        let watermark2 = Bytes::from_static(b"watermark2");
1282        let watermark3 = Bytes::from_static(b"watermark3");
1283        let mut index = build_and_test_watermark_index(
1284            WatermarkDirection::Ascending,
1285            watermark1.clone(),
1286            watermark2.clone(),
1287            watermark3,
1288        );
1289
1290        let test_table_id = TableId::from(233);
1291
1292        let mut version = HummockVersion::from_rpc_protobuf(&PbHummockVersion {
1293            state_table_info: HashMap::from_iter([(
1294                test_table_id,
1295                StateTableInfo {
1296                    committed_epoch: EPOCH1,
1297                    compaction_group_id: StaticCompactionGroupId::StateDefault as _,
1298                },
1299            )]),
1300            ..Default::default()
1301        });
1302        version.table_watermarks.insert(
1303            test_table_id,
1304            TableWatermarks {
1305                watermarks: vec![(
1306                    EPOCH1,
1307                    vec![VnodeWatermark {
1308                        watermark: watermark1.clone(),
1309                        vnode_bitmap: build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
1310                    }]
1311                    .into(),
1312                )],
1313                direction: WatermarkDirection::Ascending,
1314                watermark_type: WatermarkSerdeType::PkPrefix,
1315            }
1316            .into(),
1317        );
1318        index.apply_committed_watermarks(
1319            version
1320                .table_watermarks
1321                .get(&test_table_id)
1322                .unwrap()
1323                .clone(),
1324            EPOCH1,
1325        );
1326        assert_eq!(EPOCH1, index.committed_epoch.unwrap());
1327        assert_eq!(EPOCH2, index.latest_epoch);
1328        for vnode in 0..VirtualNode::COUNT_FOR_TEST {
1329            let vnode = VirtualNode::from_index(vnode);
1330            if (1..5).contains(&vnode.to_index()) {
1331                assert_eq!(watermark1, index.read_watermark(vnode, EPOCH1).unwrap());
1332                assert_eq!(watermark2, index.read_watermark(vnode, EPOCH2).unwrap());
1333            } else {
1334                assert_eq!(watermark1, index.read_watermark(vnode, EPOCH1).unwrap());
1335            }
1336        }
1337    }
1338
1339    #[test]
1340    fn test_filter_regress_watermark() {
1341        let watermark1 = Bytes::from_static(b"watermark1");
1342        let watermark2 = Bytes::from_static(b"watermark2");
1343        let watermark3 = Bytes::from_static(b"watermark3");
1344        let index = build_and_test_watermark_index(
1345            WatermarkDirection::Ascending,
1346            watermark1.clone(),
1347            watermark2,
1348            watermark3.clone(),
1349        );
1350
1351        let mut new_watermarks = vec![
1352            // Partial regress
1353            VnodeWatermark {
1354                vnode_bitmap: build_bitmap(0..2),
1355                watermark: watermark1.clone(),
1356            },
1357            // All not regress
1358            VnodeWatermark {
1359                vnode_bitmap: build_bitmap(2..4),
1360                watermark: watermark3.clone(),
1361            },
1362            // All regress
1363            VnodeWatermark {
1364                vnode_bitmap: build_bitmap(4..5),
1365                watermark: watermark1.clone(),
1366            },
1367            // All newly set vnode
1368            VnodeWatermark {
1369                vnode_bitmap: build_bitmap(5..6),
1370                watermark: watermark3.clone(),
1371            },
1372        ];
1373
1374        index.filter_regress_watermarks(&mut new_watermarks);
1375
1376        assert_eq!(
1377            new_watermarks,
1378            vec![
1379                VnodeWatermark {
1380                    vnode_bitmap: build_bitmap(0..1),
1381                    watermark: watermark1,
1382                },
1383                VnodeWatermark {
1384                    vnode_bitmap: build_bitmap(2..4),
1385                    watermark: watermark3.clone(),
1386                },
1387                VnodeWatermark {
1388                    vnode_bitmap: build_bitmap(5..6),
1389                    watermark: watermark3,
1390                },
1391            ]
1392        );
1393    }
1394}