risingwave_hummock_sdk/
table_watermark.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
17use std::fmt::Display;
18use std::mem::size_of;
19use std::ops::Bound::{Excluded, Included, Unbounded};
20use std::sync::Arc;
21
22use bytes::Bytes;
23use itertools::Itertools;
24use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
25use risingwave_common::catalog::TableId;
26use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
27use risingwave_common::types::ToDatumRef;
28use risingwave_common::util::sort_util::{OrderType, cmp_datum};
29use risingwave_common_estimate_size::EstimateSize;
30use risingwave_pb::hummock::table_watermarks::PbEpochNewWatermarks;
31use risingwave_pb::hummock::{
32    PbVnodeWatermark, PbWatermarkSerdeType, TableWatermarks as PbTableWatermarks,
33};
34use tracing::{debug, warn};
35
36use crate::HummockEpoch;
37use crate::key::{TableKey, TableKeyRange, prefix_slice_with_vnode, vnode};
38
39#[derive(Clone)]
40pub struct ReadTableWatermark {
41    pub direction: WatermarkDirection,
42    pub vnode_watermarks: BTreeMap<VirtualNode, Bytes>,
43}
44
45#[derive(Clone)]
46pub struct TableWatermarksIndex {
47    pub watermark_direction: WatermarkDirection,
48    // later epoch at the back
49    pub staging_watermarks: VecDeque<(HummockEpoch, Arc<[VnodeWatermark]>)>,
50    pub committed_watermarks: Option<Arc<TableWatermarks>>,
51    latest_epoch: HummockEpoch,
52    committed_epoch: Option<HummockEpoch>,
53    watermark_type: WatermarkSerdeType,
54}
55
56impl TableWatermarksIndex {
57    pub fn new(
58        watermark_direction: WatermarkDirection,
59        first_epoch: HummockEpoch,
60        first_vnode_watermark: Vec<VnodeWatermark>,
61        committed_epoch: Option<HummockEpoch>,
62        watermark_type: WatermarkSerdeType,
63    ) -> Self {
64        if let Some(committed_epoch) = committed_epoch {
65            assert!(first_epoch > committed_epoch);
66        }
67        Self {
68            watermark_direction,
69            staging_watermarks: VecDeque::from_iter([(
70                first_epoch,
71                Arc::from(first_vnode_watermark),
72            )]),
73            committed_watermarks: None,
74            latest_epoch: first_epoch,
75            committed_epoch,
76            watermark_type,
77        }
78    }
79
80    pub fn new_committed(
81        committed_watermarks: Arc<TableWatermarks>,
82        committed_epoch: HummockEpoch,
83        watermark_type: WatermarkSerdeType,
84    ) -> Self {
85        Self {
86            watermark_direction: committed_watermarks.direction,
87            staging_watermarks: VecDeque::new(),
88            committed_epoch: Some(committed_epoch),
89            latest_epoch: committed_epoch,
90            committed_watermarks: Some(committed_watermarks),
91            watermark_type,
92        }
93    }
94
95    pub fn read_watermark(&self, vnode: VirtualNode, epoch: HummockEpoch) -> Option<Bytes> {
96        // iterate from new epoch to old epoch
97        for (watermark_epoch, vnode_watermark_list) in self.staging_watermarks.iter().rev().chain(
98            self.committed_watermarks
99                .iter()
100                .flat_map(|watermarks| watermarks.watermarks.iter().rev()),
101        ) {
102            if *watermark_epoch > epoch {
103                continue;
104            }
105            for vnode_watermark in vnode_watermark_list.as_ref() {
106                if vnode_watermark.vnode_bitmap.is_set(vnode.to_index()) {
107                    return Some(vnode_watermark.watermark.clone());
108                }
109            }
110        }
111        None
112    }
113
114    pub fn latest_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
115        self.read_watermark(vnode, HummockEpoch::MAX)
116    }
117
118    pub fn rewrite_range_with_table_watermark(
119        &self,
120        epoch: HummockEpoch,
121        key_range: &mut TableKeyRange,
122    ) {
123        if !matches!(self.watermark_type, WatermarkSerdeType::PkPrefix) {
124            // TODO: Storage read respecting watermark will be implemented in a later PR in StateTable.
125            return;
126        }
127        let vnode = vnode(key_range);
128        if let Some(watermark) = self.read_watermark(vnode, epoch) {
129            match self.watermark_direction {
130                WatermarkDirection::Ascending => {
131                    let overwrite_start_key = match &key_range.0 {
132                        Included(start_key) | Excluded(start_key) => {
133                            start_key.key_part() < watermark
134                        }
135                        Unbounded => true,
136                    };
137                    if overwrite_start_key {
138                        let watermark_key = TableKey(prefix_slice_with_vnode(vnode, &watermark));
139                        let fully_filtered = match &key_range.1 {
140                            Included(end_key) => end_key < &watermark_key,
141                            Excluded(end_key) => end_key <= &watermark_key,
142                            Unbounded => false,
143                        };
144                        if fully_filtered {
145                            key_range.1 = Excluded(watermark_key.clone());
146                        }
147                        key_range.0 = Included(watermark_key);
148                    }
149                }
150                WatermarkDirection::Descending => {
151                    let overwrite_end_key = match &key_range.1 {
152                        Included(end_key) | Excluded(end_key) => end_key.key_part() > watermark,
153                        Unbounded => true,
154                    };
155                    if overwrite_end_key {
156                        let watermark_key = TableKey(prefix_slice_with_vnode(vnode, &watermark));
157                        let fully_filtered = match &key_range.0 {
158                            Included(start_key) => start_key > &watermark_key,
159                            Excluded(start_key) => start_key >= &watermark_key,
160                            Unbounded => false,
161                        };
162                        if fully_filtered {
163                            *key_range = (Included(watermark_key.clone()), Excluded(watermark_key));
164                        } else {
165                            key_range.1 = Included(watermark_key);
166                        }
167                    }
168                }
169            }
170        }
171    }
172
173    pub fn filter_regress_watermarks(&self, watermarks: &mut Vec<VnodeWatermark>) {
174        if !matches!(self.watermark_type, WatermarkSerdeType::PkPrefix) {
175            // The current kv log store can emit regressed watermarks, but only for WatermarkSerdeType::PkPrefix.
176            return;
177        }
178        let mut ret = Vec::with_capacity(watermarks.len());
179        for watermark in watermarks.drain(..) {
180            let vnode_count = watermark.vnode_count();
181
182            let mut regress_vnodes = None;
183            for vnode in watermark.vnode_bitmap.iter_vnodes() {
184                if let Some(prev_watermark) = self.latest_watermark(vnode) {
185                    let is_regress = match self.direction() {
186                        WatermarkDirection::Ascending => prev_watermark > watermark.watermark,
187                        WatermarkDirection::Descending => prev_watermark < watermark.watermark,
188                    };
189                    if is_regress {
190                        warn!(
191                            "table watermark regress: {:?} {} {:?} {:?}",
192                            self.direction(),
193                            vnode,
194                            watermark.watermark,
195                            prev_watermark
196                        );
197                        regress_vnodes
198                            .get_or_insert_with(|| BitmapBuilder::zeroed(vnode_count))
199                            .set(vnode.to_index(), true);
200                    }
201                }
202            }
203            if let Some(regress_vnodes) = regress_vnodes {
204                let mut bitmap_builder = None;
205                for vnode in watermark.vnode_bitmap.iter_vnodes() {
206                    let vnode_index = vnode.to_index();
207                    if !regress_vnodes.is_set(vnode_index) {
208                        bitmap_builder
209                            .get_or_insert_with(|| BitmapBuilder::zeroed(vnode_count))
210                            .set(vnode_index, true);
211                    }
212                }
213                if let Some(bitmap_builder) = bitmap_builder {
214                    ret.push(VnodeWatermark::new(
215                        Arc::new(bitmap_builder.finish()),
216                        watermark.watermark,
217                    ));
218                }
219            } else {
220                // no vnode has regress watermark
221                ret.push(watermark);
222            }
223        }
224        *watermarks = ret;
225    }
226
227    pub fn direction(&self) -> WatermarkDirection {
228        self.watermark_direction
229    }
230
231    pub fn add_epoch_watermark(
232        &mut self,
233        epoch: HummockEpoch,
234        vnode_watermark_list: Arc<[VnodeWatermark]>,
235        direction: WatermarkDirection,
236    ) {
237        assert!(epoch > self.latest_epoch);
238        assert_eq!(self.watermark_direction, direction);
239        self.latest_epoch = epoch;
240        #[cfg(debug_assertions)]
241        if !vnode_watermark_list.is_empty() {
242            let vnode_count = vnode_watermark_list[0].vnode_count();
243            let mut vnode_is_set = BitmapBuilder::zeroed(vnode_count);
244            for vnode_watermark in vnode_watermark_list.as_ref() {
245                for vnode in vnode_watermark.vnode_bitmap.iter_ones() {
246                    assert!(!vnode_is_set.is_set(vnode));
247                    vnode_is_set.set(vnode, true);
248                    let vnode = VirtualNode::from_index(vnode);
249                    if let Some(prev_watermark) = self.latest_watermark(vnode) {
250                        match self.watermark_direction {
251                            WatermarkDirection::Ascending => {
252                                assert!(vnode_watermark.watermark >= prev_watermark);
253                            }
254                            WatermarkDirection::Descending => {
255                                assert!(vnode_watermark.watermark <= prev_watermark);
256                            }
257                        };
258                    }
259                }
260            }
261        }
262        self.staging_watermarks
263            .push_back((epoch, vnode_watermark_list));
264    }
265
266    pub fn apply_committed_watermarks(
267        &mut self,
268        committed_watermark: Arc<TableWatermarks>,
269        committed_epoch: HummockEpoch,
270    ) {
271        assert_eq!(self.watermark_direction, committed_watermark.direction);
272        if let Some(prev_committed_epoch) = self.committed_epoch {
273            assert!(prev_committed_epoch <= committed_epoch);
274            if prev_committed_epoch == committed_epoch {
275                return;
276            }
277        }
278        if self.latest_epoch < committed_epoch {
279            debug!(
280                latest_epoch = self.latest_epoch,
281                committed_epoch, "committed_epoch exceed table watermark latest_epoch"
282            );
283            self.latest_epoch = committed_epoch;
284        }
285        self.committed_epoch = Some(committed_epoch);
286        self.committed_watermarks = Some(committed_watermark);
287        // keep only watermark higher than committed epoch
288        while let Some((old_epoch, _)) = self.staging_watermarks.front()
289            && *old_epoch <= committed_epoch
290        {
291            let _ = self.staging_watermarks.pop_front();
292        }
293    }
294}
295
296#[derive(Debug, Clone, Copy, Eq, PartialEq)]
297pub enum WatermarkDirection {
298    Ascending,
299    Descending,
300}
301
302impl Display for WatermarkDirection {
303    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
304        match self {
305            WatermarkDirection::Ascending => write!(f, "Ascending"),
306            WatermarkDirection::Descending => write!(f, "Descending"),
307        }
308    }
309}
310
311impl WatermarkDirection {
312    pub fn key_filter_by_watermark(
313        &self,
314        key: impl AsRef<[u8]>,
315        watermark: impl AsRef<[u8]>,
316    ) -> bool {
317        let key = key.as_ref();
318        let watermark = watermark.as_ref();
319        match self {
320            WatermarkDirection::Ascending => key < watermark,
321            WatermarkDirection::Descending => key > watermark,
322        }
323    }
324
325    pub fn datum_filter_by_watermark(
326        &self,
327        watermark_col: impl ToDatumRef,
328        watermark: impl ToDatumRef,
329        order_type: OrderType,
330    ) -> bool {
331        let watermark_col = watermark_col.to_datum_ref();
332        let watermark = watermark.to_datum_ref();
333        match self {
334            WatermarkDirection::Ascending => {
335                // watermark_col < watermark
336                cmp_datum(watermark_col, watermark, order_type).is_lt()
337            }
338            WatermarkDirection::Descending => {
339                //  watermark_col > watermark
340                cmp_datum(watermark_col, watermark, order_type).is_gt()
341            }
342        }
343    }
344
345    pub fn is_ascending(&self) -> bool {
346        match self {
347            WatermarkDirection::Ascending => true,
348            WatermarkDirection::Descending => false,
349        }
350    }
351}
352
353#[derive(Clone, Debug, PartialEq, EstimateSize)]
354pub struct VnodeWatermark {
355    vnode_bitmap: Arc<Bitmap>,
356    watermark: Bytes,
357}
358
359impl VnodeWatermark {
360    pub fn new(vnode_bitmap: Arc<Bitmap>, watermark: Bytes) -> Self {
361        Self {
362            vnode_bitmap,
363            watermark,
364        }
365    }
366
367    pub fn vnode_bitmap(&self) -> &Bitmap {
368        &self.vnode_bitmap
369    }
370
371    /// Vnode count derived from the bitmap.
372    pub fn vnode_count(&self) -> usize {
373        self.vnode_bitmap.len()
374    }
375
376    pub fn watermark(&self) -> &Bytes {
377        &self.watermark
378    }
379
380    pub fn to_protobuf(&self) -> PbVnodeWatermark {
381        self.into()
382    }
383}
384
385impl From<PbVnodeWatermark> for VnodeWatermark {
386    fn from(pb: PbVnodeWatermark) -> Self {
387        Self {
388            vnode_bitmap: Arc::new(Bitmap::from(pb.vnode_bitmap.as_ref().unwrap())),
389            watermark: Bytes::from(pb.watermark),
390        }
391    }
392}
393
394impl From<&PbVnodeWatermark> for VnodeWatermark {
395    fn from(pb: &PbVnodeWatermark) -> Self {
396        Self {
397            vnode_bitmap: Arc::new(Bitmap::from(pb.vnode_bitmap.as_ref().unwrap())),
398            watermark: Bytes::from(pb.watermark.clone()),
399        }
400    }
401}
402
403impl From<VnodeWatermark> for PbVnodeWatermark {
404    fn from(watermark: VnodeWatermark) -> Self {
405        Self {
406            watermark: watermark.watermark.into(),
407            vnode_bitmap: Some(watermark.vnode_bitmap.to_protobuf()),
408        }
409    }
410}
411
412impl From<&VnodeWatermark> for PbVnodeWatermark {
413    fn from(watermark: &VnodeWatermark) -> Self {
414        Self {
415            watermark: watermark.watermark.to_vec(),
416            vnode_bitmap: Some(watermark.vnode_bitmap.to_protobuf()),
417        }
418    }
419}
420
421#[derive(Clone, Debug, PartialEq)]
422pub struct TableWatermarks {
423    // later epoch at the back
424    pub watermarks: Vec<(HummockEpoch, Arc<[VnodeWatermark]>)>,
425    pub direction: WatermarkDirection,
426    pub watermark_type: WatermarkSerdeType,
427}
428
429impl TableWatermarks {
430    pub fn single_epoch(
431        epoch: HummockEpoch,
432        watermarks: Vec<VnodeWatermark>,
433        direction: WatermarkDirection,
434        watermark_type: WatermarkSerdeType,
435    ) -> Self {
436        let mut this = Self {
437            direction,
438            watermarks: Vec::new(),
439            watermark_type,
440        };
441        this.add_new_epoch_watermarks(epoch, watermarks.into(), direction, watermark_type);
442        this
443    }
444
445    pub fn add_new_epoch_watermarks(
446        &mut self,
447        epoch: HummockEpoch,
448        watermarks: Arc<[VnodeWatermark]>,
449        direction: WatermarkDirection,
450        watermark_type: WatermarkSerdeType,
451    ) {
452        assert_eq!(self.direction, direction);
453        assert_eq!(self.watermark_type, watermark_type);
454
455        if let Some((prev_epoch, _)) = self.watermarks.last() {
456            assert!(*prev_epoch < epoch);
457        }
458        if !watermarks.is_empty() {
459            let vnode_count = watermarks[0].vnode_count();
460            for watermark in &*watermarks {
461                assert_eq!(watermark.vnode_count(), vnode_count);
462            }
463            if let Some(existing_vnode_count) = self.vnode_count() {
464                assert_eq!(existing_vnode_count, vnode_count);
465            }
466        }
467        self.watermarks.push((epoch, watermarks));
468    }
469
470    /// Vnode count derived from existing watermarks. Returns `None` if there is no watermark.
471    fn vnode_count(&self) -> Option<usize> {
472        self.watermarks
473            .iter()
474            .flat_map(|(_, watermarks)| watermarks.as_ref())
475            .next()
476            .map(|w| w.vnode_count())
477    }
478}
479
480pub fn merge_multiple_new_table_watermarks(
481    table_watermarks_list: impl IntoIterator<Item = HashMap<TableId, TableWatermarks>>,
482) -> HashMap<TableId, TableWatermarks> {
483    #[allow(clippy::type_complexity)]
484    let mut ret: HashMap<
485        TableId,
486        (
487            WatermarkDirection,
488            BTreeMap<u64, Vec<VnodeWatermark>>,
489            WatermarkSerdeType,
490        ),
491    > = HashMap::new();
492    for table_watermarks in table_watermarks_list {
493        for (table_id, new_table_watermarks) in table_watermarks {
494            let epoch_watermarks = match ret.entry(table_id) {
495                Entry::Occupied(entry) => {
496                    let (direction, epoch_watermarks, watermark_type) = entry.into_mut();
497                    assert_eq!(&new_table_watermarks.direction, direction);
498                    assert_eq!(&new_table_watermarks.watermark_type, watermark_type);
499                    epoch_watermarks
500                }
501                Entry::Vacant(entry) => {
502                    let (_, epoch_watermarks, _) = entry.insert((
503                        new_table_watermarks.direction,
504                        BTreeMap::new(),
505                        new_table_watermarks.watermark_type,
506                    ));
507                    epoch_watermarks
508                }
509            };
510            for (new_epoch, new_epoch_watermarks) in new_table_watermarks.watermarks {
511                epoch_watermarks
512                    .entry(new_epoch)
513                    .or_insert_with(Vec::new)
514                    .extend(new_epoch_watermarks.iter().cloned());
515            }
516        }
517    }
518    ret.into_iter()
519        .map(
520            |(table_id, (direction, epoch_watermarks, watermark_type))| {
521                (
522                    table_id,
523                    TableWatermarks {
524                        direction,
525                        // ordered from earlier epoch to later epoch
526                        watermarks: epoch_watermarks
527                            .into_iter()
528                            .map(|(epoch, watermarks)| (epoch, Arc::from(watermarks)))
529                            .collect(),
530                        watermark_type,
531                    },
532                )
533            },
534        )
535        .collect()
536}
537
538impl TableWatermarks {
539    pub fn apply_new_table_watermarks(&mut self, newly_added_watermarks: &TableWatermarks) {
540        assert_eq!(self.direction, newly_added_watermarks.direction);
541        assert!(self.watermarks.iter().map(|(epoch, _)| epoch).is_sorted());
542        assert!(
543            newly_added_watermarks
544                .watermarks
545                .iter()
546                .map(|(epoch, _)| epoch)
547                .is_sorted()
548        );
549        // ensure that the newly added watermarks have a later epoch than the previous latest epoch.
550        if let Some((prev_last_epoch, _)) = self.watermarks.last()
551            && let Some((new_first_epoch, _)) = newly_added_watermarks.watermarks.first()
552        {
553            assert!(prev_last_epoch < new_first_epoch);
554        }
555        self.watermarks.extend(
556            newly_added_watermarks
557                .watermarks
558                .iter()
559                .map(|(epoch, new_watermarks)| (*epoch, new_watermarks.clone())),
560        );
561    }
562
563    pub fn clear_stale_epoch_watermark(&mut self, safe_epoch: u64) {
564        match self.watermarks.first() {
565            None => {
566                // return on empty watermark
567                return;
568            }
569            Some((earliest_epoch, _)) => {
570                if *earliest_epoch >= safe_epoch {
571                    // No stale epoch watermark needs to be cleared.
572                    return;
573                }
574            }
575        }
576        debug!("clear stale table watermark below epoch {}", safe_epoch);
577        let mut result_epoch_watermark = Vec::with_capacity(self.watermarks.len());
578        let mut set_vnode: HashSet<VirtualNode> = HashSet::new();
579        let mut vnode_count: Option<usize> = None; // lazy initialized on first occurrence of vnode watermark
580        while let Some((epoch, _)) = self.watermarks.last() {
581            if *epoch >= safe_epoch {
582                let (epoch, watermarks) = self.watermarks.pop().expect("have check Some");
583                for watermark in watermarks.as_ref() {
584                    vnode_count.get_or_insert_with(|| watermark.vnode_count());
585                    for vnode in watermark.vnode_bitmap.iter_vnodes() {
586                        set_vnode.insert(vnode);
587                    }
588                }
589                result_epoch_watermark.push((epoch, watermarks));
590            } else {
591                break;
592            }
593        }
594        while vnode_count != Some(set_vnode.len())
595            && let Some((_, watermarks)) = self.watermarks.pop()
596        {
597            let mut new_vnode_watermarks = Vec::new();
598            for vnode_watermark in watermarks.as_ref() {
599                let mut new_set_vnode = Vec::new();
600                vnode_count.get_or_insert_with(|| vnode_watermark.vnode_count());
601                for vnode in vnode_watermark.vnode_bitmap.iter_vnodes() {
602                    if set_vnode.insert(vnode) {
603                        new_set_vnode.push(vnode);
604                    }
605                }
606                if !new_set_vnode.is_empty() {
607                    let mut builder = BitmapBuilder::zeroed(vnode_watermark.vnode_count());
608                    for vnode in new_set_vnode {
609                        builder.set(vnode.to_index(), true);
610                    }
611                    let bitmap = Arc::new(builder.finish());
612                    new_vnode_watermarks.push(VnodeWatermark {
613                        vnode_bitmap: bitmap,
614                        watermark: vnode_watermark.watermark.clone(),
615                    })
616                }
617            }
618            if !new_vnode_watermarks.is_empty() {
619                if let Some((last_epoch, last_watermarks)) = result_epoch_watermark.last_mut()
620                    && *last_epoch == safe_epoch
621                {
622                    *last_watermarks = Arc::from(
623                        last_watermarks
624                            .iter()
625                            .cloned()
626                            .chain(new_vnode_watermarks.into_iter())
627                            .collect_vec(),
628                    );
629                } else {
630                    result_epoch_watermark.push((safe_epoch, Arc::from(new_vnode_watermarks)));
631                }
632            }
633        }
634        // epoch watermark are added from later epoch to earlier epoch.
635        // reverse to ensure that earlier epochs are at the front
636        result_epoch_watermark.reverse();
637        assert!(
638            result_epoch_watermark
639                .is_sorted_by(|(first_epoch, _), (second_epoch, _)| { first_epoch < second_epoch })
640        );
641        *self = TableWatermarks {
642            watermarks: result_epoch_watermark,
643            direction: self.direction,
644            watermark_type: self.watermark_type,
645        }
646    }
647}
648
649impl TableWatermarks {
650    pub fn estimated_encode_len(&self) -> usize {
651        self.watermarks.len() * size_of::<HummockEpoch>()
652            + self
653                .watermarks
654                .iter()
655                .map(|(_, watermarks)| {
656                    watermarks
657                        .iter()
658                        .map(|watermark| watermark.estimated_size())
659                        .sum::<usize>()
660                })
661                .sum::<usize>()
662            + size_of::<bool>() // for direction
663    }
664
665    pub fn to_protobuf(&self) -> PbTableWatermarks {
666        self.into()
667    }
668}
669
670impl From<&PbTableWatermarks> for WatermarkSerdeType {
671    fn from(pb: &PbTableWatermarks) -> Self {
672        match pb.raw_watermark_serde_type() {
673            PbWatermarkSerdeType::TypeUnspecified => {
674                // For backward compatibility.
675                #[expect(deprecated)]
676                if pb.is_non_pk_prefix {
677                    WatermarkSerdeType::NonPkPrefix
678                } else {
679                    WatermarkSerdeType::PkPrefix
680                }
681            }
682            PbWatermarkSerdeType::PkPrefix => WatermarkSerdeType::PkPrefix,
683            PbWatermarkSerdeType::NonPkPrefix => WatermarkSerdeType::NonPkPrefix,
684            PbWatermarkSerdeType::Value => WatermarkSerdeType::Value,
685        }
686    }
687}
688impl From<&PbTableWatermarks> for TableWatermarks {
689    fn from(pb: &PbTableWatermarks) -> Self {
690        let watermark_type = WatermarkSerdeType::from(pb);
691        Self {
692            watermarks: pb
693                .epoch_watermarks
694                .iter()
695                .map(|epoch_watermark| {
696                    let epoch = epoch_watermark.epoch;
697                    let watermarks = epoch_watermark
698                        .watermarks
699                        .iter()
700                        .map(VnodeWatermark::from)
701                        .collect();
702                    (epoch, watermarks)
703                })
704                .collect(),
705            direction: if pb.is_ascending {
706                WatermarkDirection::Ascending
707            } else {
708                WatermarkDirection::Descending
709            },
710            watermark_type,
711        }
712    }
713}
714
715impl From<WatermarkSerdeType> for PbWatermarkSerdeType {
716    fn from(s: WatermarkSerdeType) -> Self {
717        match s {
718            WatermarkSerdeType::PkPrefix => PbWatermarkSerdeType::PkPrefix,
719            WatermarkSerdeType::NonPkPrefix => PbWatermarkSerdeType::NonPkPrefix,
720            WatermarkSerdeType::Value => PbWatermarkSerdeType::Value,
721        }
722    }
723}
724
725impl From<&TableWatermarks> for PbTableWatermarks {
726    fn from(table_watermarks: &TableWatermarks) -> Self {
727        let is_non_pk_prefix = match table_watermarks.watermark_type {
728            WatermarkSerdeType::PkPrefix => false,
729            WatermarkSerdeType::NonPkPrefix => true,
730            WatermarkSerdeType::Value => false,
731        };
732        Self {
733            epoch_watermarks: table_watermarks
734                .watermarks
735                .iter()
736                .map(|(epoch, watermarks)| PbEpochNewWatermarks {
737                    watermarks: watermarks.iter().map(|wm| wm.into()).collect(),
738                    epoch: *epoch,
739                })
740                .collect(),
741            is_ascending: match table_watermarks.direction {
742                WatermarkDirection::Ascending => true,
743                WatermarkDirection::Descending => false,
744            },
745            #[expect(deprecated)]
746            is_non_pk_prefix,
747            raw_watermark_serde_type: PbWatermarkSerdeType::from(table_watermarks.watermark_type)
748                as i32,
749        }
750    }
751}
752
753impl From<PbTableWatermarks> for TableWatermarks {
754    fn from(pb: PbTableWatermarks) -> Self {
755        let watermark_type = WatermarkSerdeType::from(&pb);
756        Self {
757            watermarks: pb
758                .epoch_watermarks
759                .into_iter()
760                .map(|epoch_watermark| {
761                    let epoch = epoch_watermark.epoch;
762                    let watermarks = epoch_watermark
763                        .watermarks
764                        .into_iter()
765                        .map(VnodeWatermark::from)
766                        .collect();
767                    (epoch, watermarks)
768                })
769                .collect(),
770            direction: if pb.is_ascending {
771                WatermarkDirection::Ascending
772            } else {
773                WatermarkDirection::Descending
774            },
775            watermark_type,
776        }
777    }
778}
779
780impl From<TableWatermarks> for PbTableWatermarks {
781    fn from(table_watermarks: TableWatermarks) -> Self {
782        Self {
783            epoch_watermarks: table_watermarks
784                .watermarks
785                .into_iter()
786                .map(|(epoch, watermarks)| PbEpochNewWatermarks {
787                    watermarks: watermarks.iter().map(PbVnodeWatermark::from).collect(),
788                    epoch,
789                })
790                .collect(),
791            is_ascending: match table_watermarks.direction {
792                WatermarkDirection::Ascending => true,
793                WatermarkDirection::Descending => false,
794            },
795            #[expect(deprecated)]
796            is_non_pk_prefix: match table_watermarks.watermark_type {
797                WatermarkSerdeType::NonPkPrefix => true,
798                WatermarkSerdeType::PkPrefix => false,
799                WatermarkSerdeType::Value => false,
800            },
801            raw_watermark_serde_type: PbWatermarkSerdeType::from(table_watermarks.watermark_type)
802                as i32,
803        }
804    }
805}
806
807#[derive(Debug, Clone, Copy, PartialEq, Eq)]
808pub enum WatermarkSerdeType {
809    PkPrefix,
810    NonPkPrefix,
811    Value,
812}
813
814#[cfg(test)]
815mod tests {
816    use std::collections::Bound::Included;
817    use std::collections::{Bound, HashMap};
818    use std::ops::Bound::Excluded;
819    use std::sync::Arc;
820    use std::vec;
821
822    use bytes::Bytes;
823    use itertools::Itertools;
824    use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
825    use risingwave_common::catalog::TableId;
826    use risingwave_common::hash::VirtualNode;
827    use risingwave_common::util::epoch::{EpochExt, test_epoch};
828    use risingwave_pb::hummock::{PbHummockVersion, StateTableInfo};
829
830    use crate::compaction_group::StaticCompactionGroupId;
831    use crate::key::{TableKeyRange, is_empty_key_range, prefixed_range_with_vnode};
832    use crate::table_watermark::{
833        TableWatermarks, TableWatermarksIndex, VnodeWatermark, WatermarkDirection,
834        WatermarkSerdeType, merge_multiple_new_table_watermarks,
835    };
836    use crate::version::HummockVersion;
837
838    fn build_bitmap(vnodes: impl IntoIterator<Item = usize>) -> Arc<Bitmap> {
839        let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
840        for vnode in vnodes {
841            builder.set(vnode, true);
842        }
843        Arc::new(builder.finish())
844    }
845
846    #[test]
847    fn test_apply_new_table_watermark() {
848        let epoch1 = test_epoch(1);
849        let direction = WatermarkDirection::Ascending;
850        let watermark1 = Bytes::from("watermark1");
851        let watermark2 = Bytes::from("watermark2");
852        let watermark3 = Bytes::from("watermark3");
853        let watermark4 = Bytes::from("watermark4");
854        let watermark_type = WatermarkSerdeType::PkPrefix;
855        let mut table_watermarks = TableWatermarks::single_epoch(
856            epoch1,
857            vec![VnodeWatermark::new(build_bitmap(vec![0, 1, 2]), watermark1)],
858            direction,
859            watermark_type,
860        );
861        let epoch2 = epoch1.next_epoch();
862        table_watermarks.add_new_epoch_watermarks(
863            epoch2,
864            vec![VnodeWatermark::new(
865                build_bitmap(vec![0, 1, 2, 3]),
866                watermark2,
867            )]
868            .into(),
869            direction,
870            watermark_type,
871        );
872
873        let mut table_watermark_checkpoint = table_watermarks.clone();
874
875        let epoch3 = epoch2.next_epoch();
876        let mut second_table_watermark = TableWatermarks::single_epoch(
877            epoch3,
878            vec![VnodeWatermark::new(
879                build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
880                watermark3.clone(),
881            )],
882            direction,
883            watermark_type,
884        );
885        table_watermarks.add_new_epoch_watermarks(
886            epoch3,
887            vec![VnodeWatermark::new(
888                build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
889                watermark3,
890            )]
891            .into(),
892            direction,
893            watermark_type,
894        );
895        let epoch4 = epoch3.next_epoch();
896        let epoch5 = epoch4.next_epoch();
897        table_watermarks.add_new_epoch_watermarks(
898            epoch5,
899            vec![VnodeWatermark::new(
900                build_bitmap(vec![0, 3, 4]),
901                watermark4.clone(),
902            )]
903            .into(),
904            direction,
905            watermark_type,
906        );
907        second_table_watermark.add_new_epoch_watermarks(
908            epoch5,
909            vec![VnodeWatermark::new(build_bitmap(vec![0, 3, 4]), watermark4)].into(),
910            direction,
911            watermark_type,
912        );
913
914        table_watermark_checkpoint.apply_new_table_watermarks(&second_table_watermark);
915        assert_eq!(table_watermarks, table_watermark_checkpoint);
916    }
917
918    #[test]
919    fn test_clear_stale_epoch_watmermark() {
920        let epoch1 = test_epoch(1);
921        let direction = WatermarkDirection::Ascending;
922        let watermark1 = Bytes::from("watermark1");
923        let watermark2 = Bytes::from("watermark2");
924        let watermark3 = Bytes::from("watermark3");
925        let watermark4 = Bytes::from("watermark4");
926        let watermark_type = WatermarkSerdeType::PkPrefix;
927        let mut table_watermarks = TableWatermarks::single_epoch(
928            epoch1,
929            vec![VnodeWatermark::new(build_bitmap(vec![0, 1, 2]), watermark1)],
930            direction,
931            watermark_type,
932        );
933        let epoch2 = epoch1.next_epoch();
934        table_watermarks.add_new_epoch_watermarks(
935            epoch2,
936            vec![VnodeWatermark::new(
937                build_bitmap(vec![0, 1, 2, 3]),
938                watermark2.clone(),
939            )]
940            .into(),
941            direction,
942            watermark_type,
943        );
944        let epoch3 = epoch2.next_epoch();
945        table_watermarks.add_new_epoch_watermarks(
946            epoch3,
947            vec![VnodeWatermark::new(
948                build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
949                watermark3.clone(),
950            )]
951            .into(),
952            direction,
953            watermark_type,
954        );
955        let epoch4 = epoch3.next_epoch();
956        let epoch5 = epoch4.next_epoch();
957        table_watermarks.add_new_epoch_watermarks(
958            epoch5,
959            vec![VnodeWatermark::new(
960                build_bitmap(vec![0, 3, 4]),
961                watermark4.clone(),
962            )]
963            .into(),
964            direction,
965            watermark_type,
966        );
967
968        let mut table_watermarks_checkpoint = table_watermarks.clone();
969        table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch1);
970        assert_eq!(table_watermarks_checkpoint, table_watermarks);
971
972        table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch2);
973        assert_eq!(
974            table_watermarks_checkpoint,
975            TableWatermarks {
976                watermarks: vec![
977                    (
978                        epoch2,
979                        vec![VnodeWatermark::new(
980                            build_bitmap(vec![0, 1, 2, 3]),
981                            watermark2,
982                        )]
983                        .into()
984                    ),
985                    (
986                        epoch3,
987                        vec![VnodeWatermark::new(
988                            build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
989                            watermark3.clone(),
990                        )]
991                        .into()
992                    ),
993                    (
994                        epoch5,
995                        vec![VnodeWatermark::new(
996                            build_bitmap(vec![0, 3, 4]),
997                            watermark4.clone(),
998                        )]
999                        .into()
1000                    )
1001                ],
1002                direction,
1003                watermark_type,
1004            }
1005        );
1006
1007        table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch3);
1008        assert_eq!(
1009            table_watermarks_checkpoint,
1010            TableWatermarks {
1011                watermarks: vec![
1012                    (
1013                        epoch3,
1014                        vec![VnodeWatermark::new(
1015                            build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
1016                            watermark3.clone(),
1017                        )]
1018                        .into()
1019                    ),
1020                    (
1021                        epoch5,
1022                        vec![VnodeWatermark::new(
1023                            build_bitmap(vec![0, 3, 4]),
1024                            watermark4.clone(),
1025                        )]
1026                        .into()
1027                    )
1028                ],
1029                direction,
1030                watermark_type,
1031            }
1032        );
1033
1034        table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch4);
1035        assert_eq!(
1036            table_watermarks_checkpoint,
1037            TableWatermarks {
1038                watermarks: vec![
1039                    (
1040                        epoch4,
1041                        vec![VnodeWatermark::new(
1042                            build_bitmap((1..3).chain(5..VirtualNode::COUNT_FOR_TEST)),
1043                            watermark3.clone()
1044                        )]
1045                        .into()
1046                    ),
1047                    (
1048                        epoch5,
1049                        vec![VnodeWatermark::new(
1050                            build_bitmap(vec![0, 3, 4]),
1051                            watermark4.clone(),
1052                        )]
1053                        .into()
1054                    )
1055                ],
1056                direction,
1057                watermark_type,
1058            }
1059        );
1060
1061        table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch5);
1062        assert_eq!(
1063            table_watermarks_checkpoint,
1064            TableWatermarks {
1065                watermarks: vec![(
1066                    epoch5,
1067                    vec![
1068                        VnodeWatermark::new(build_bitmap(vec![0, 3, 4]), watermark4),
1069                        VnodeWatermark::new(
1070                            build_bitmap((1..3).chain(5..VirtualNode::COUNT_FOR_TEST)),
1071                            watermark3
1072                        )
1073                    ]
1074                    .into()
1075                )],
1076                direction,
1077                watermark_type,
1078            }
1079        );
1080    }
1081
1082    #[test]
1083    fn test_merge_multiple_new_table_watermarks() {
1084        fn epoch_new_watermark(epoch: u64, bitmaps: Vec<&Bitmap>) -> (u64, Arc<[VnodeWatermark]>) {
1085            (
1086                epoch,
1087                bitmaps
1088                    .into_iter()
1089                    .map(|bitmap| VnodeWatermark {
1090                        watermark: Bytes::from(vec![1, 2, epoch as _]),
1091                        vnode_bitmap: Arc::new(bitmap.clone()),
1092                    })
1093                    .collect_vec()
1094                    .into(),
1095            )
1096        }
1097        fn build_table_watermark(
1098            vnodes: impl IntoIterator<Item = usize>,
1099            epochs: impl IntoIterator<Item = u64>,
1100        ) -> TableWatermarks {
1101            let bitmap = build_bitmap(vnodes);
1102            TableWatermarks {
1103                watermarks: epochs
1104                    .into_iter()
1105                    .map(|epoch: u64| epoch_new_watermark(epoch, vec![&bitmap]))
1106                    .collect(),
1107                direction: WatermarkDirection::Ascending,
1108                watermark_type: WatermarkSerdeType::PkPrefix,
1109            }
1110        }
1111        let table1_watermark1 = build_table_watermark(0..3, vec![1, 2, 4]);
1112        let table1_watermark2 = build_table_watermark(4..6, vec![1, 2, 5]);
1113        let table2_watermark = build_table_watermark(0..4, 1..3);
1114        let table3_watermark = build_table_watermark(0..4, 3..5);
1115        let mut first = HashMap::new();
1116        first.insert(TableId::new(1), table1_watermark1);
1117        first.insert(TableId::new(2), table2_watermark.clone());
1118        let mut second = HashMap::new();
1119        second.insert(TableId::new(1), table1_watermark2);
1120        second.insert(TableId::new(3), table3_watermark.clone());
1121        let result = merge_multiple_new_table_watermarks(vec![first, second]);
1122        let mut expected = HashMap::new();
1123        expected.insert(
1124            TableId::new(1),
1125            TableWatermarks {
1126                watermarks: vec![
1127                    epoch_new_watermark(1, vec![&build_bitmap(0..3), &build_bitmap(4..6)]),
1128                    epoch_new_watermark(2, vec![&build_bitmap(0..3), &build_bitmap(4..6)]),
1129                    epoch_new_watermark(4, vec![&build_bitmap(0..3)]),
1130                    epoch_new_watermark(5, vec![&build_bitmap(4..6)]),
1131                ],
1132                direction: WatermarkDirection::Ascending,
1133                watermark_type: WatermarkSerdeType::PkPrefix,
1134            },
1135        );
1136        expected.insert(TableId::new(2), table2_watermark);
1137        expected.insert(TableId::new(3), table3_watermark);
1138        assert_eq!(result, expected);
1139    }
1140
1141    const COMMITTED_EPOCH: u64 = test_epoch(1);
1142    const EPOCH1: u64 = test_epoch(2);
1143    const EPOCH2: u64 = test_epoch(3);
1144    const TEST_SINGLE_VNODE: VirtualNode = VirtualNode::from_index(1);
1145
1146    fn build_watermark_range(
1147        direction: WatermarkDirection,
1148        (low, high): (Bound<Bytes>, Bound<Bytes>),
1149    ) -> TableKeyRange {
1150        let range = match direction {
1151            WatermarkDirection::Ascending => (low, high),
1152            WatermarkDirection::Descending => (high, low),
1153        };
1154        prefixed_range_with_vnode(range, TEST_SINGLE_VNODE)
1155    }
1156
1157    /// Build and return a watermark index with the following watermarks
1158    /// EPOCH1 bitmap(0, 1, 2, 3) watermark1
1159    /// EPOCH2 bitmap(1, 2, 3, 4) watermark2
1160    fn build_and_test_watermark_index(
1161        direction: WatermarkDirection,
1162        watermark1: Bytes,
1163        watermark2: Bytes,
1164        watermark3: Bytes,
1165    ) -> TableWatermarksIndex {
1166        let mut index = TableWatermarksIndex::new(
1167            direction,
1168            EPOCH1,
1169            vec![VnodeWatermark::new(build_bitmap(0..4), watermark1.clone())],
1170            Some(COMMITTED_EPOCH),
1171            WatermarkSerdeType::PkPrefix,
1172        );
1173        index.add_epoch_watermark(
1174            EPOCH2,
1175            vec![VnodeWatermark::new(build_bitmap(1..5), watermark2.clone())].into(),
1176            direction,
1177        );
1178
1179        assert_eq!(
1180            index.read_watermark(VirtualNode::from_index(0), EPOCH1),
1181            Some(watermark1.clone())
1182        );
1183        assert_eq!(
1184            index.read_watermark(VirtualNode::from_index(1), EPOCH1),
1185            Some(watermark1.clone())
1186        );
1187        assert_eq!(
1188            index.read_watermark(VirtualNode::from_index(4), EPOCH1),
1189            None
1190        );
1191        assert_eq!(
1192            index.read_watermark(VirtualNode::from_index(0), EPOCH2),
1193            Some(watermark1.clone())
1194        );
1195        assert_eq!(
1196            index.read_watermark(VirtualNode::from_index(1), EPOCH2),
1197            Some(watermark2.clone())
1198        );
1199        assert_eq!(
1200            index.read_watermark(VirtualNode::from_index(4), EPOCH2),
1201            Some(watermark2.clone())
1202        );
1203        assert_eq!(
1204            index.latest_watermark(VirtualNode::from_index(0)),
1205            Some(watermark1.clone())
1206        );
1207        assert_eq!(
1208            index.latest_watermark(VirtualNode::from_index(1)),
1209            Some(watermark2.clone())
1210        );
1211        assert_eq!(
1212            index.latest_watermark(VirtualNode::from_index(4)),
1213            Some(watermark2.clone())
1214        );
1215
1216        // watermark is watermark2
1217        let check_watermark_range =
1218            |query_range: (Bound<Bytes>, Bound<Bytes>),
1219             output_range: Option<(Bound<Bytes>, Bound<Bytes>)>| {
1220                let mut range = build_watermark_range(direction, query_range);
1221                index.rewrite_range_with_table_watermark(EPOCH2, &mut range);
1222                if let Some(output_range) = output_range {
1223                    assert_eq!(range, build_watermark_range(direction, output_range));
1224                } else {
1225                    assert!(is_empty_key_range(&range));
1226                }
1227            };
1228
1229        // test read from single vnode and truncate begin key range
1230        check_watermark_range(
1231            (Included(watermark1.clone()), Excluded(watermark3.clone())),
1232            Some((Included(watermark2.clone()), Excluded(watermark3.clone()))),
1233        );
1234
1235        // test read from single vnode and begin key right at watermark
1236        check_watermark_range(
1237            (Included(watermark2.clone()), Excluded(watermark3.clone())),
1238            Some((Included(watermark2.clone()), Excluded(watermark3.clone()))),
1239        );
1240        check_watermark_range(
1241            (Excluded(watermark2.clone()), Excluded(watermark3.clone())),
1242            Some((Excluded(watermark2.clone()), Excluded(watermark3))),
1243        );
1244
1245        // test read from single vnode and end key right at watermark
1246        check_watermark_range(
1247            (Excluded(watermark1.clone()), Excluded(watermark2.clone())),
1248            None,
1249        );
1250        check_watermark_range(
1251            (Excluded(watermark1), Included(watermark2.clone())),
1252            Some((Included(watermark2.clone()), Included(watermark2))),
1253        );
1254
1255        index
1256    }
1257
1258    #[test]
1259    fn test_watermark_index_ascending() {
1260        let watermark1 = Bytes::from_static(b"watermark1");
1261        let watermark2 = Bytes::from_static(b"watermark2");
1262        let watermark3 = Bytes::from_static(b"watermark3");
1263        build_and_test_watermark_index(
1264            WatermarkDirection::Ascending,
1265            watermark1,
1266            watermark2,
1267            watermark3,
1268        );
1269    }
1270
1271    #[test]
1272    fn test_watermark_index_descending() {
1273        let watermark1 = Bytes::from_static(b"watermark254");
1274        let watermark2 = Bytes::from_static(b"watermark253");
1275        let watermark3 = Bytes::from_static(b"watermark252");
1276        build_and_test_watermark_index(
1277            WatermarkDirection::Descending,
1278            watermark1,
1279            watermark2,
1280            watermark3,
1281        );
1282    }
1283
1284    #[test]
1285    fn test_apply_committed_index() {
1286        let watermark1 = Bytes::from_static(b"watermark1");
1287        let watermark2 = Bytes::from_static(b"watermark2");
1288        let watermark3 = Bytes::from_static(b"watermark3");
1289        let mut index = build_and_test_watermark_index(
1290            WatermarkDirection::Ascending,
1291            watermark1.clone(),
1292            watermark2.clone(),
1293            watermark3,
1294        );
1295
1296        let test_table_id = TableId::from(233);
1297
1298        let mut version = HummockVersion::from_rpc_protobuf(&PbHummockVersion {
1299            state_table_info: HashMap::from_iter([(
1300                test_table_id,
1301                StateTableInfo {
1302                    committed_epoch: EPOCH1,
1303                    compaction_group_id: StaticCompactionGroupId::StateDefault,
1304                },
1305            )]),
1306            ..Default::default()
1307        });
1308        version.table_watermarks.insert(
1309            test_table_id,
1310            TableWatermarks {
1311                watermarks: vec![(
1312                    EPOCH1,
1313                    vec![VnodeWatermark {
1314                        watermark: watermark1.clone(),
1315                        vnode_bitmap: build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
1316                    }]
1317                    .into(),
1318                )],
1319                direction: WatermarkDirection::Ascending,
1320                watermark_type: WatermarkSerdeType::PkPrefix,
1321            }
1322            .into(),
1323        );
1324        index.apply_committed_watermarks(
1325            version
1326                .table_watermarks
1327                .get(&test_table_id)
1328                .unwrap()
1329                .clone(),
1330            EPOCH1,
1331        );
1332        assert_eq!(EPOCH1, index.committed_epoch.unwrap());
1333        assert_eq!(EPOCH2, index.latest_epoch);
1334        for vnode in 0..VirtualNode::COUNT_FOR_TEST {
1335            let vnode = VirtualNode::from_index(vnode);
1336            if (1..5).contains(&vnode.to_index()) {
1337                assert_eq!(watermark1, index.read_watermark(vnode, EPOCH1).unwrap());
1338                assert_eq!(watermark2, index.read_watermark(vnode, EPOCH2).unwrap());
1339            } else {
1340                assert_eq!(watermark1, index.read_watermark(vnode, EPOCH1).unwrap());
1341            }
1342        }
1343    }
1344
1345    #[test]
1346    fn test_filter_regress_watermark() {
1347        let watermark1 = Bytes::from_static(b"watermark1");
1348        let watermark2 = Bytes::from_static(b"watermark2");
1349        let watermark3 = Bytes::from_static(b"watermark3");
1350        let index = build_and_test_watermark_index(
1351            WatermarkDirection::Ascending,
1352            watermark1.clone(),
1353            watermark2,
1354            watermark3.clone(),
1355        );
1356
1357        let mut new_watermarks = vec![
1358            // Partial regress
1359            VnodeWatermark {
1360                vnode_bitmap: build_bitmap(0..2),
1361                watermark: watermark1.clone(),
1362            },
1363            // All not regress
1364            VnodeWatermark {
1365                vnode_bitmap: build_bitmap(2..4),
1366                watermark: watermark3.clone(),
1367            },
1368            // All regress
1369            VnodeWatermark {
1370                vnode_bitmap: build_bitmap(4..5),
1371                watermark: watermark1.clone(),
1372            },
1373            // All newly set vnode
1374            VnodeWatermark {
1375                vnode_bitmap: build_bitmap(5..6),
1376                watermark: watermark3.clone(),
1377            },
1378        ];
1379
1380        index.filter_regress_watermarks(&mut new_watermarks);
1381
1382        assert_eq!(
1383            new_watermarks,
1384            vec![
1385                VnodeWatermark {
1386                    vnode_bitmap: build_bitmap(0..1),
1387                    watermark: watermark1,
1388                },
1389                VnodeWatermark {
1390                    vnode_bitmap: build_bitmap(2..4),
1391                    watermark: watermark3.clone(),
1392                },
1393                VnodeWatermark {
1394                    vnode_bitmap: build_bitmap(5..6),
1395                    watermark: watermark3,
1396                },
1397            ]
1398        );
1399    }
1400}