risingwave_hummock_sdk/
table_watermark.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
17use std::fmt::Display;
18use std::mem::size_of;
19use std::ops::Bound::{Excluded, Included, Unbounded};
20use std::sync::Arc;
21
22use bytes::Bytes;
23use itertools::Itertools;
24use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
25use risingwave_common::catalog::TableId;
26use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
27use risingwave_common::types::ToDatumRef;
28use risingwave_common::util::sort_util::{OrderType, cmp_datum};
29use risingwave_common_estimate_size::EstimateSize;
30use risingwave_pb::hummock::table_watermarks::PbEpochNewWatermarks;
31use risingwave_pb::hummock::{PbVnodeWatermark, TableWatermarks as PbTableWatermarks};
32use tracing::{debug, warn};
33
34use crate::HummockEpoch;
35use crate::key::{TableKey, TableKeyRange, prefix_slice_with_vnode, vnode};
36
37#[derive(Clone)]
38pub struct ReadTableWatermark {
39    pub direction: WatermarkDirection,
40    pub vnode_watermarks: BTreeMap<VirtualNode, Bytes>,
41}
42
43#[derive(Clone)]
44pub struct PkPrefixTableWatermarksIndex {
45    pub watermark_direction: WatermarkDirection,
46    // later epoch at the back
47    pub staging_watermarks: VecDeque<(HummockEpoch, Arc<[VnodeWatermark]>)>,
48    pub committed_watermarks: Option<Arc<TableWatermarks>>,
49    latest_epoch: HummockEpoch,
50    committed_epoch: Option<HummockEpoch>,
51}
52
53impl PkPrefixTableWatermarksIndex {
54    pub fn new(
55        watermark_direction: WatermarkDirection,
56        first_epoch: HummockEpoch,
57        first_vnode_watermark: Vec<VnodeWatermark>,
58        committed_epoch: Option<HummockEpoch>,
59    ) -> Self {
60        if let Some(committed_epoch) = committed_epoch {
61            assert!(first_epoch > committed_epoch);
62        }
63        Self {
64            watermark_direction,
65            staging_watermarks: VecDeque::from_iter([(
66                first_epoch,
67                Arc::from(first_vnode_watermark),
68            )]),
69            committed_watermarks: None,
70            latest_epoch: first_epoch,
71            committed_epoch,
72        }
73    }
74
75    pub fn new_committed(
76        committed_watermarks: Arc<TableWatermarks>,
77        committed_epoch: HummockEpoch,
78    ) -> Self {
79        assert_eq!(
80            committed_watermarks.watermark_type,
81            WatermarkSerdeType::PkPrefix
82        );
83        Self {
84            watermark_direction: committed_watermarks.direction,
85            staging_watermarks: VecDeque::new(),
86            committed_epoch: Some(committed_epoch),
87            latest_epoch: committed_epoch,
88            committed_watermarks: Some(committed_watermarks),
89        }
90    }
91
92    pub fn read_watermark(&self, vnode: VirtualNode, epoch: HummockEpoch) -> Option<Bytes> {
93        // iterate from new epoch to old epoch
94        for (watermark_epoch, vnode_watermark_list) in self.staging_watermarks.iter().rev().chain(
95            self.committed_watermarks
96                .iter()
97                .flat_map(|watermarks| watermarks.watermarks.iter().rev()),
98        ) {
99            if *watermark_epoch > epoch {
100                continue;
101            }
102            for vnode_watermark in vnode_watermark_list.as_ref() {
103                if vnode_watermark.vnode_bitmap.is_set(vnode.to_index()) {
104                    return Some(vnode_watermark.watermark.clone());
105                }
106            }
107        }
108        None
109    }
110
111    pub fn latest_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
112        self.read_watermark(vnode, HummockEpoch::MAX)
113    }
114
115    pub fn rewrite_range_with_table_watermark(
116        &self,
117        epoch: HummockEpoch,
118        key_range: &mut TableKeyRange,
119    ) {
120        let vnode = vnode(key_range);
121        if let Some(watermark) = self.read_watermark(vnode, epoch) {
122            match self.watermark_direction {
123                WatermarkDirection::Ascending => {
124                    let overwrite_start_key = match &key_range.0 {
125                        Included(start_key) | Excluded(start_key) => {
126                            start_key.key_part() < watermark
127                        }
128                        Unbounded => true,
129                    };
130                    if overwrite_start_key {
131                        let watermark_key = TableKey(prefix_slice_with_vnode(vnode, &watermark));
132                        let fully_filtered = match &key_range.1 {
133                            Included(end_key) => end_key < &watermark_key,
134                            Excluded(end_key) => end_key <= &watermark_key,
135                            Unbounded => false,
136                        };
137                        if fully_filtered {
138                            key_range.1 = Excluded(watermark_key.clone());
139                        }
140                        key_range.0 = Included(watermark_key);
141                    }
142                }
143                WatermarkDirection::Descending => {
144                    let overwrite_end_key = match &key_range.1 {
145                        Included(end_key) | Excluded(end_key) => end_key.key_part() > watermark,
146                        Unbounded => true,
147                    };
148                    if overwrite_end_key {
149                        let watermark_key = TableKey(prefix_slice_with_vnode(vnode, &watermark));
150                        let fully_filtered = match &key_range.0 {
151                            Included(start_key) => start_key > &watermark_key,
152                            Excluded(start_key) => start_key >= &watermark_key,
153                            Unbounded => false,
154                        };
155                        if fully_filtered {
156                            *key_range = (Included(watermark_key.clone()), Excluded(watermark_key));
157                        } else {
158                            key_range.1 = Included(watermark_key);
159                        }
160                    }
161                }
162            }
163        }
164    }
165
166    pub fn filter_regress_watermarks(&self, watermarks: &mut Vec<VnodeWatermark>) {
167        let mut ret = Vec::with_capacity(watermarks.len());
168        for watermark in watermarks.drain(..) {
169            let vnode_count = watermark.vnode_count();
170
171            let mut regress_vnodes = None;
172            for vnode in watermark.vnode_bitmap.iter_vnodes() {
173                if let Some(prev_watermark) = self.latest_watermark(vnode) {
174                    let is_regress = match self.direction() {
175                        WatermarkDirection::Ascending => prev_watermark > watermark.watermark,
176                        WatermarkDirection::Descending => prev_watermark < watermark.watermark,
177                    };
178                    if is_regress {
179                        warn!(
180                            "table watermark regress: {:?} {} {:?} {:?}",
181                            self.direction(),
182                            vnode,
183                            watermark.watermark,
184                            prev_watermark
185                        );
186                        regress_vnodes
187                            .get_or_insert_with(|| BitmapBuilder::zeroed(vnode_count))
188                            .set(vnode.to_index(), true);
189                    }
190                }
191            }
192            if let Some(regress_vnodes) = regress_vnodes {
193                let mut bitmap_builder = None;
194                for vnode in watermark.vnode_bitmap.iter_vnodes() {
195                    let vnode_index = vnode.to_index();
196                    if !regress_vnodes.is_set(vnode_index) {
197                        bitmap_builder
198                            .get_or_insert_with(|| BitmapBuilder::zeroed(vnode_count))
199                            .set(vnode_index, true);
200                    }
201                }
202                if let Some(bitmap_builder) = bitmap_builder {
203                    ret.push(VnodeWatermark::new(
204                        Arc::new(bitmap_builder.finish()),
205                        watermark.watermark,
206                    ));
207                }
208            } else {
209                // no vnode has regress watermark
210                ret.push(watermark);
211            }
212        }
213        *watermarks = ret;
214    }
215
216    pub fn direction(&self) -> WatermarkDirection {
217        self.watermark_direction
218    }
219
220    pub fn add_epoch_watermark(
221        &mut self,
222        epoch: HummockEpoch,
223        vnode_watermark_list: Arc<[VnodeWatermark]>,
224        direction: WatermarkDirection,
225    ) {
226        assert!(epoch > self.latest_epoch);
227        assert_eq!(self.watermark_direction, direction);
228        self.latest_epoch = epoch;
229        #[cfg(debug_assertions)]
230        if !vnode_watermark_list.is_empty() {
231            let vnode_count = vnode_watermark_list[0].vnode_count();
232            let mut vnode_is_set = BitmapBuilder::zeroed(vnode_count);
233            for vnode_watermark in vnode_watermark_list.as_ref() {
234                for vnode in vnode_watermark.vnode_bitmap.iter_ones() {
235                    assert!(!vnode_is_set.is_set(vnode));
236                    vnode_is_set.set(vnode, true);
237                    let vnode = VirtualNode::from_index(vnode);
238                    if let Some(prev_watermark) = self.latest_watermark(vnode) {
239                        match self.watermark_direction {
240                            WatermarkDirection::Ascending => {
241                                assert!(vnode_watermark.watermark >= prev_watermark);
242                            }
243                            WatermarkDirection::Descending => {
244                                assert!(vnode_watermark.watermark <= prev_watermark);
245                            }
246                        };
247                    }
248                }
249            }
250        }
251        self.staging_watermarks
252            .push_back((epoch, vnode_watermark_list));
253    }
254
255    pub fn apply_committed_watermarks(
256        &mut self,
257        committed_watermark: Arc<TableWatermarks>,
258        committed_epoch: HummockEpoch,
259    ) {
260        assert_eq!(
261            committed_watermark.watermark_type,
262            WatermarkSerdeType::PkPrefix
263        );
264        assert_eq!(self.watermark_direction, committed_watermark.direction);
265        if let Some(prev_committed_epoch) = self.committed_epoch {
266            assert!(prev_committed_epoch <= committed_epoch);
267            if prev_committed_epoch == committed_epoch {
268                return;
269            }
270        }
271        if self.latest_epoch < committed_epoch {
272            debug!(
273                latest_epoch = self.latest_epoch,
274                committed_epoch, "committed_epoch exceed table watermark latest_epoch"
275            );
276            self.latest_epoch = committed_epoch;
277        }
278        self.committed_epoch = Some(committed_epoch);
279        self.committed_watermarks = Some(committed_watermark);
280        // keep only watermark higher than committed epoch
281        while let Some((old_epoch, _)) = self.staging_watermarks.front()
282            && *old_epoch <= committed_epoch
283        {
284            let _ = self.staging_watermarks.pop_front();
285        }
286    }
287}
288
289#[derive(Debug, Clone, Copy, Eq, PartialEq)]
290pub enum WatermarkDirection {
291    Ascending,
292    Descending,
293}
294
295impl Display for WatermarkDirection {
296    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
297        match self {
298            WatermarkDirection::Ascending => write!(f, "Ascending"),
299            WatermarkDirection::Descending => write!(f, "Descending"),
300        }
301    }
302}
303
304impl WatermarkDirection {
305    pub fn key_filter_by_watermark(
306        &self,
307        key: impl AsRef<[u8]>,
308        watermark: impl AsRef<[u8]>,
309    ) -> bool {
310        let key = key.as_ref();
311        let watermark = watermark.as_ref();
312        match self {
313            WatermarkDirection::Ascending => key < watermark,
314            WatermarkDirection::Descending => key > watermark,
315        }
316    }
317
318    pub fn datum_filter_by_watermark(
319        &self,
320        watermark_col_in_pk: impl ToDatumRef,
321        watermark: impl ToDatumRef,
322        order_type: OrderType,
323    ) -> bool {
324        let watermark_col_in_pk = watermark_col_in_pk.to_datum_ref();
325        let watermark = watermark.to_datum_ref();
326        match self {
327            WatermarkDirection::Ascending => {
328                // watermark_col_in_pk < watermark
329                cmp_datum(watermark_col_in_pk, watermark, order_type).is_lt()
330            }
331            WatermarkDirection::Descending => {
332                //  watermark_col_in_pk > watermark
333                cmp_datum(watermark_col_in_pk, watermark, order_type).is_gt()
334            }
335        }
336    }
337
338    pub fn is_ascending(&self) -> bool {
339        match self {
340            WatermarkDirection::Ascending => true,
341            WatermarkDirection::Descending => false,
342        }
343    }
344}
345
346#[derive(Clone, Debug, PartialEq, EstimateSize)]
347pub struct VnodeWatermark {
348    vnode_bitmap: Arc<Bitmap>,
349    watermark: Bytes,
350}
351
352impl VnodeWatermark {
353    pub fn new(vnode_bitmap: Arc<Bitmap>, watermark: Bytes) -> Self {
354        Self {
355            vnode_bitmap,
356            watermark,
357        }
358    }
359
360    pub fn vnode_bitmap(&self) -> &Bitmap {
361        &self.vnode_bitmap
362    }
363
364    /// Vnode count derived from the bitmap.
365    pub fn vnode_count(&self) -> usize {
366        self.vnode_bitmap.len()
367    }
368
369    pub fn watermark(&self) -> &Bytes {
370        &self.watermark
371    }
372
373    pub fn to_protobuf(&self) -> PbVnodeWatermark {
374        self.into()
375    }
376}
377
378impl From<PbVnodeWatermark> for VnodeWatermark {
379    fn from(pb: PbVnodeWatermark) -> Self {
380        Self {
381            vnode_bitmap: Arc::new(Bitmap::from(pb.vnode_bitmap.as_ref().unwrap())),
382            watermark: Bytes::from(pb.watermark),
383        }
384    }
385}
386
387impl From<&PbVnodeWatermark> for VnodeWatermark {
388    fn from(pb: &PbVnodeWatermark) -> Self {
389        Self {
390            vnode_bitmap: Arc::new(Bitmap::from(pb.vnode_bitmap.as_ref().unwrap())),
391            watermark: Bytes::from(pb.watermark.clone()),
392        }
393    }
394}
395
396impl From<VnodeWatermark> for PbVnodeWatermark {
397    fn from(watermark: VnodeWatermark) -> Self {
398        Self {
399            watermark: watermark.watermark.into(),
400            vnode_bitmap: Some(watermark.vnode_bitmap.to_protobuf()),
401        }
402    }
403}
404
405impl From<&VnodeWatermark> for PbVnodeWatermark {
406    fn from(watermark: &VnodeWatermark) -> Self {
407        Self {
408            watermark: watermark.watermark.to_vec(),
409            vnode_bitmap: Some(watermark.vnode_bitmap.to_protobuf()),
410        }
411    }
412}
413
414#[derive(Clone, Debug, PartialEq)]
415pub struct TableWatermarks {
416    // later epoch at the back
417    pub watermarks: Vec<(HummockEpoch, Arc<[VnodeWatermark]>)>,
418    pub direction: WatermarkDirection,
419    pub watermark_type: WatermarkSerdeType,
420}
421
422impl TableWatermarks {
423    pub fn single_epoch(
424        epoch: HummockEpoch,
425        watermarks: Vec<VnodeWatermark>,
426        direction: WatermarkDirection,
427        watermark_type: WatermarkSerdeType,
428    ) -> Self {
429        let mut this = Self {
430            direction,
431            watermarks: Vec::new(),
432            watermark_type,
433        };
434        this.add_new_epoch_watermarks(epoch, watermarks.into(), direction, watermark_type);
435        this
436    }
437
438    pub fn add_new_epoch_watermarks(
439        &mut self,
440        epoch: HummockEpoch,
441        watermarks: Arc<[VnodeWatermark]>,
442        direction: WatermarkDirection,
443        watermark_type: WatermarkSerdeType,
444    ) {
445        assert_eq!(self.direction, direction);
446        assert_eq!(self.watermark_type, watermark_type);
447
448        if let Some((prev_epoch, _)) = self.watermarks.last() {
449            assert!(*prev_epoch < epoch);
450        }
451        if !watermarks.is_empty() {
452            let vnode_count = watermarks[0].vnode_count();
453            for watermark in &*watermarks {
454                assert_eq!(watermark.vnode_count(), vnode_count);
455            }
456            if let Some(existing_vnode_count) = self.vnode_count() {
457                assert_eq!(existing_vnode_count, vnode_count);
458            }
459        }
460        self.watermarks.push((epoch, watermarks));
461    }
462
463    /// Vnode count derived from existing watermarks. Returns `None` if there is no watermark.
464    fn vnode_count(&self) -> Option<usize> {
465        self.watermarks
466            .iter()
467            .flat_map(|(_, watermarks)| watermarks.as_ref())
468            .next()
469            .map(|w| w.vnode_count())
470    }
471
472    pub fn from_protobuf(pb: &PbTableWatermarks) -> Self {
473        Self {
474            watermarks: pb
475                .epoch_watermarks
476                .iter()
477                .map(|epoch_watermark| {
478                    let epoch = epoch_watermark.epoch;
479                    let watermarks = epoch_watermark
480                        .watermarks
481                        .iter()
482                        .map(VnodeWatermark::from)
483                        .collect_vec();
484                    (epoch, Arc::from(watermarks))
485                })
486                .collect(),
487            direction: if pb.is_ascending {
488                WatermarkDirection::Ascending
489            } else {
490                WatermarkDirection::Descending
491            },
492            watermark_type: if pb.is_non_pk_prefix {
493                WatermarkSerdeType::NonPkPrefix
494            } else {
495                WatermarkSerdeType::PkPrefix
496            },
497        }
498    }
499}
500
501pub fn merge_multiple_new_table_watermarks(
502    table_watermarks_list: impl IntoIterator<Item = HashMap<TableId, TableWatermarks>>,
503) -> HashMap<TableId, TableWatermarks> {
504    #[allow(clippy::type_complexity)]
505    let mut ret: HashMap<
506        TableId,
507        (
508            WatermarkDirection,
509            BTreeMap<u64, Vec<VnodeWatermark>>,
510            WatermarkSerdeType,
511        ),
512    > = HashMap::new();
513    for table_watermarks in table_watermarks_list {
514        for (table_id, new_table_watermarks) in table_watermarks {
515            let epoch_watermarks = match ret.entry(table_id) {
516                Entry::Occupied(entry) => {
517                    let (direction, epoch_watermarks, watermark_type) = entry.into_mut();
518                    assert_eq!(&new_table_watermarks.direction, direction);
519                    assert_eq!(&new_table_watermarks.watermark_type, watermark_type);
520                    epoch_watermarks
521                }
522                Entry::Vacant(entry) => {
523                    let (_, epoch_watermarks, _) = entry.insert((
524                        new_table_watermarks.direction,
525                        BTreeMap::new(),
526                        new_table_watermarks.watermark_type,
527                    ));
528                    epoch_watermarks
529                }
530            };
531            for (new_epoch, new_epoch_watermarks) in new_table_watermarks.watermarks {
532                epoch_watermarks
533                    .entry(new_epoch)
534                    .or_insert_with(Vec::new)
535                    .extend(new_epoch_watermarks.iter().cloned());
536            }
537        }
538    }
539    ret.into_iter()
540        .map(
541            |(table_id, (direction, epoch_watermarks, watermark_type))| {
542                (
543                    table_id,
544                    TableWatermarks {
545                        direction,
546                        // ordered from earlier epoch to later epoch
547                        watermarks: epoch_watermarks
548                            .into_iter()
549                            .map(|(epoch, watermarks)| (epoch, Arc::from(watermarks)))
550                            .collect(),
551                        watermark_type,
552                    },
553                )
554            },
555        )
556        .collect()
557}
558
559impl TableWatermarks {
560    pub fn apply_new_table_watermarks(&mut self, newly_added_watermarks: &TableWatermarks) {
561        assert_eq!(self.direction, newly_added_watermarks.direction);
562        assert!(self.watermarks.iter().map(|(epoch, _)| epoch).is_sorted());
563        assert!(
564            newly_added_watermarks
565                .watermarks
566                .iter()
567                .map(|(epoch, _)| epoch)
568                .is_sorted()
569        );
570        // ensure that the newly added watermarks have a later epoch than the previous latest epoch.
571        if let Some((prev_last_epoch, _)) = self.watermarks.last()
572            && let Some((new_first_epoch, _)) = newly_added_watermarks.watermarks.first()
573        {
574            assert!(prev_last_epoch < new_first_epoch);
575        }
576        self.watermarks.extend(
577            newly_added_watermarks
578                .watermarks
579                .iter()
580                .map(|(epoch, new_watermarks)| (*epoch, new_watermarks.clone())),
581        );
582    }
583
584    pub fn clear_stale_epoch_watermark(&mut self, safe_epoch: u64) {
585        match self.watermarks.first() {
586            None => {
587                // return on empty watermark
588                return;
589            }
590            Some((earliest_epoch, _)) => {
591                if *earliest_epoch >= safe_epoch {
592                    // No stale epoch watermark needs to be cleared.
593                    return;
594                }
595            }
596        }
597        debug!("clear stale table watermark below epoch {}", safe_epoch);
598        let mut result_epoch_watermark = Vec::with_capacity(self.watermarks.len());
599        let mut set_vnode: HashSet<VirtualNode> = HashSet::new();
600        let mut vnode_count: Option<usize> = None; // lazy initialized on first occurrence of vnode watermark
601        while let Some((epoch, _)) = self.watermarks.last() {
602            if *epoch >= safe_epoch {
603                let (epoch, watermarks) = self.watermarks.pop().expect("have check Some");
604                for watermark in watermarks.as_ref() {
605                    vnode_count.get_or_insert_with(|| watermark.vnode_count());
606                    for vnode in watermark.vnode_bitmap.iter_vnodes() {
607                        set_vnode.insert(vnode);
608                    }
609                }
610                result_epoch_watermark.push((epoch, watermarks));
611            } else {
612                break;
613            }
614        }
615        while vnode_count != Some(set_vnode.len())
616            && let Some((_, watermarks)) = self.watermarks.pop()
617        {
618            let mut new_vnode_watermarks = Vec::new();
619            for vnode_watermark in watermarks.as_ref() {
620                let mut new_set_vnode = Vec::new();
621                vnode_count.get_or_insert_with(|| vnode_watermark.vnode_count());
622                for vnode in vnode_watermark.vnode_bitmap.iter_vnodes() {
623                    if set_vnode.insert(vnode) {
624                        new_set_vnode.push(vnode);
625                    }
626                }
627                if !new_set_vnode.is_empty() {
628                    let mut builder = BitmapBuilder::zeroed(vnode_watermark.vnode_count());
629                    for vnode in new_set_vnode {
630                        builder.set(vnode.to_index(), true);
631                    }
632                    let bitmap = Arc::new(builder.finish());
633                    new_vnode_watermarks.push(VnodeWatermark {
634                        vnode_bitmap: bitmap,
635                        watermark: vnode_watermark.watermark.clone(),
636                    })
637                }
638            }
639            if !new_vnode_watermarks.is_empty() {
640                if let Some((last_epoch, last_watermarks)) = result_epoch_watermark.last_mut()
641                    && *last_epoch == safe_epoch
642                {
643                    *last_watermarks = Arc::from(
644                        last_watermarks
645                            .iter()
646                            .cloned()
647                            .chain(new_vnode_watermarks.into_iter())
648                            .collect_vec(),
649                    );
650                } else {
651                    result_epoch_watermark.push((safe_epoch, Arc::from(new_vnode_watermarks)));
652                }
653            }
654        }
655        // epoch watermark are added from later epoch to earlier epoch.
656        // reverse to ensure that earlier epochs are at the front
657        result_epoch_watermark.reverse();
658        assert!(
659            result_epoch_watermark
660                .is_sorted_by(|(first_epoch, _), (second_epoch, _)| { first_epoch < second_epoch })
661        );
662        *self = TableWatermarks {
663            watermarks: result_epoch_watermark,
664            direction: self.direction,
665            watermark_type: self.watermark_type,
666        }
667    }
668}
669
670impl TableWatermarks {
671    pub fn estimated_encode_len(&self) -> usize {
672        self.watermarks.len() * size_of::<HummockEpoch>()
673            + self
674                .watermarks
675                .iter()
676                .map(|(_, watermarks)| {
677                    watermarks
678                        .iter()
679                        .map(|watermark| watermark.estimated_size())
680                        .sum::<usize>()
681                })
682                .sum::<usize>()
683            + size_of::<bool>() // for direction
684    }
685
686    pub fn to_protobuf(&self) -> PbTableWatermarks {
687        self.into()
688    }
689}
690
691impl From<&PbTableWatermarks> for TableWatermarks {
692    fn from(pb: &PbTableWatermarks) -> Self {
693        Self {
694            watermarks: pb
695                .epoch_watermarks
696                .iter()
697                .map(|epoch_watermark| {
698                    let epoch = epoch_watermark.epoch;
699                    let watermarks = epoch_watermark
700                        .watermarks
701                        .iter()
702                        .map(VnodeWatermark::from)
703                        .collect();
704                    (epoch, watermarks)
705                })
706                .collect(),
707            direction: if pb.is_ascending {
708                WatermarkDirection::Ascending
709            } else {
710                WatermarkDirection::Descending
711            },
712            watermark_type: if pb.is_non_pk_prefix {
713                WatermarkSerdeType::NonPkPrefix
714            } else {
715                WatermarkSerdeType::PkPrefix
716            },
717        }
718    }
719}
720
721impl From<&TableWatermarks> for PbTableWatermarks {
722    fn from(table_watermarks: &TableWatermarks) -> Self {
723        Self {
724            epoch_watermarks: table_watermarks
725                .watermarks
726                .iter()
727                .map(|(epoch, watermarks)| PbEpochNewWatermarks {
728                    watermarks: watermarks.iter().map(|wm| wm.into()).collect(),
729                    epoch: *epoch,
730                })
731                .collect(),
732            is_ascending: match table_watermarks.direction {
733                WatermarkDirection::Ascending => true,
734                WatermarkDirection::Descending => false,
735            },
736            is_non_pk_prefix: match table_watermarks.watermark_type {
737                WatermarkSerdeType::NonPkPrefix => true,
738                WatermarkSerdeType::PkPrefix => false,
739            },
740        }
741    }
742}
743
744impl From<PbTableWatermarks> for TableWatermarks {
745    fn from(pb: PbTableWatermarks) -> Self {
746        Self {
747            watermarks: pb
748                .epoch_watermarks
749                .into_iter()
750                .map(|epoch_watermark| {
751                    let epoch = epoch_watermark.epoch;
752                    let watermarks = epoch_watermark
753                        .watermarks
754                        .into_iter()
755                        .map(VnodeWatermark::from)
756                        .collect();
757                    (epoch, watermarks)
758                })
759                .collect(),
760            direction: if pb.is_ascending {
761                WatermarkDirection::Ascending
762            } else {
763                WatermarkDirection::Descending
764            },
765            watermark_type: if pb.is_non_pk_prefix {
766                WatermarkSerdeType::NonPkPrefix
767            } else {
768                WatermarkSerdeType::PkPrefix
769            },
770        }
771    }
772}
773
774impl From<TableWatermarks> for PbTableWatermarks {
775    fn from(table_watermarks: TableWatermarks) -> Self {
776        Self {
777            epoch_watermarks: table_watermarks
778                .watermarks
779                .into_iter()
780                .map(|(epoch, watermarks)| PbEpochNewWatermarks {
781                    watermarks: watermarks.iter().map(PbVnodeWatermark::from).collect(),
782                    epoch,
783                })
784                .collect(),
785            is_ascending: match table_watermarks.direction {
786                WatermarkDirection::Ascending => true,
787                WatermarkDirection::Descending => false,
788            },
789            is_non_pk_prefix: match table_watermarks.watermark_type {
790                WatermarkSerdeType::NonPkPrefix => true,
791                WatermarkSerdeType::PkPrefix => false,
792            },
793        }
794    }
795}
796
797#[derive(Debug, Clone, Copy, PartialEq, Eq)]
798pub enum WatermarkSerdeType {
799    PkPrefix,
800    NonPkPrefix,
801}
802
803#[cfg(test)]
804mod tests {
805    use std::collections::Bound::Included;
806    use std::collections::{Bound, HashMap};
807    use std::ops::Bound::Excluded;
808    use std::sync::Arc;
809    use std::vec;
810
811    use bytes::Bytes;
812    use itertools::Itertools;
813    use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
814    use risingwave_common::catalog::TableId;
815    use risingwave_common::hash::VirtualNode;
816    use risingwave_common::util::epoch::{EpochExt, test_epoch};
817    use risingwave_pb::hummock::{PbHummockVersion, StateTableInfo};
818
819    use crate::compaction_group::StaticCompactionGroupId;
820    use crate::key::{TableKeyRange, is_empty_key_range, prefixed_range_with_vnode};
821    use crate::table_watermark::{
822        PkPrefixTableWatermarksIndex, TableWatermarks, VnodeWatermark, WatermarkDirection,
823        WatermarkSerdeType, merge_multiple_new_table_watermarks,
824    };
825    use crate::version::HummockVersion;
826
827    fn build_bitmap(vnodes: impl IntoIterator<Item = usize>) -> Arc<Bitmap> {
828        let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
829        for vnode in vnodes {
830            builder.set(vnode, true);
831        }
832        Arc::new(builder.finish())
833    }
834
835    #[test]
836    fn test_apply_new_table_watermark() {
837        let epoch1 = test_epoch(1);
838        let direction = WatermarkDirection::Ascending;
839        let watermark1 = Bytes::from("watermark1");
840        let watermark2 = Bytes::from("watermark2");
841        let watermark3 = Bytes::from("watermark3");
842        let watermark4 = Bytes::from("watermark4");
843        let watermark_type = WatermarkSerdeType::PkPrefix;
844        let mut table_watermarks = TableWatermarks::single_epoch(
845            epoch1,
846            vec![VnodeWatermark::new(
847                build_bitmap(vec![0, 1, 2]),
848                watermark1.clone(),
849            )],
850            direction,
851            watermark_type,
852        );
853        let epoch2 = epoch1.next_epoch();
854        table_watermarks.add_new_epoch_watermarks(
855            epoch2,
856            vec![VnodeWatermark::new(
857                build_bitmap(vec![0, 1, 2, 3]),
858                watermark2.clone(),
859            )]
860            .into(),
861            direction,
862            watermark_type,
863        );
864
865        let mut table_watermark_checkpoint = table_watermarks.clone();
866
867        let epoch3 = epoch2.next_epoch();
868        let mut second_table_watermark = TableWatermarks::single_epoch(
869            epoch3,
870            vec![VnodeWatermark::new(
871                build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
872                watermark3.clone(),
873            )],
874            direction,
875            watermark_type,
876        );
877        table_watermarks.add_new_epoch_watermarks(
878            epoch3,
879            vec![VnodeWatermark::new(
880                build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
881                watermark3.clone(),
882            )]
883            .into(),
884            direction,
885            watermark_type,
886        );
887        let epoch4 = epoch3.next_epoch();
888        let epoch5 = epoch4.next_epoch();
889        table_watermarks.add_new_epoch_watermarks(
890            epoch5,
891            vec![VnodeWatermark::new(
892                build_bitmap(vec![0, 3, 4]),
893                watermark4.clone(),
894            )]
895            .into(),
896            direction,
897            watermark_type,
898        );
899        second_table_watermark.add_new_epoch_watermarks(
900            epoch5,
901            vec![VnodeWatermark::new(
902                build_bitmap(vec![0, 3, 4]),
903                watermark4.clone(),
904            )]
905            .into(),
906            direction,
907            watermark_type,
908        );
909
910        table_watermark_checkpoint.apply_new_table_watermarks(&second_table_watermark);
911        assert_eq!(table_watermarks, table_watermark_checkpoint);
912    }
913
914    #[test]
915    fn test_clear_stale_epoch_watmermark() {
916        let epoch1 = test_epoch(1);
917        let direction = WatermarkDirection::Ascending;
918        let watermark1 = Bytes::from("watermark1");
919        let watermark2 = Bytes::from("watermark2");
920        let watermark3 = Bytes::from("watermark3");
921        let watermark4 = Bytes::from("watermark4");
922        let watermark_type = WatermarkSerdeType::PkPrefix;
923        let mut table_watermarks = TableWatermarks::single_epoch(
924            epoch1,
925            vec![VnodeWatermark::new(
926                build_bitmap(vec![0, 1, 2]),
927                watermark1.clone(),
928            )],
929            direction,
930            watermark_type,
931        );
932        let epoch2 = epoch1.next_epoch();
933        table_watermarks.add_new_epoch_watermarks(
934            epoch2,
935            vec![VnodeWatermark::new(
936                build_bitmap(vec![0, 1, 2, 3]),
937                watermark2.clone(),
938            )]
939            .into(),
940            direction,
941            watermark_type,
942        );
943        let epoch3 = epoch2.next_epoch();
944        table_watermarks.add_new_epoch_watermarks(
945            epoch3,
946            vec![VnodeWatermark::new(
947                build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
948                watermark3.clone(),
949            )]
950            .into(),
951            direction,
952            watermark_type,
953        );
954        let epoch4 = epoch3.next_epoch();
955        let epoch5 = epoch4.next_epoch();
956        table_watermarks.add_new_epoch_watermarks(
957            epoch5,
958            vec![VnodeWatermark::new(
959                build_bitmap(vec![0, 3, 4]),
960                watermark4.clone(),
961            )]
962            .into(),
963            direction,
964            watermark_type,
965        );
966
967        let mut table_watermarks_checkpoint = table_watermarks.clone();
968        table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch1);
969        assert_eq!(table_watermarks_checkpoint, table_watermarks);
970
971        table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch2);
972        assert_eq!(
973            table_watermarks_checkpoint,
974            TableWatermarks {
975                watermarks: vec![
976                    (
977                        epoch2,
978                        vec![VnodeWatermark::new(
979                            build_bitmap(vec![0, 1, 2, 3]),
980                            watermark2.clone(),
981                        )]
982                        .into()
983                    ),
984                    (
985                        epoch3,
986                        vec![VnodeWatermark::new(
987                            build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
988                            watermark3.clone(),
989                        )]
990                        .into()
991                    ),
992                    (
993                        epoch5,
994                        vec![VnodeWatermark::new(
995                            build_bitmap(vec![0, 3, 4]),
996                            watermark4.clone(),
997                        )]
998                        .into()
999                    )
1000                ],
1001                direction,
1002                watermark_type,
1003            }
1004        );
1005
1006        table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch3);
1007        assert_eq!(
1008            table_watermarks_checkpoint,
1009            TableWatermarks {
1010                watermarks: vec![
1011                    (
1012                        epoch3,
1013                        vec![VnodeWatermark::new(
1014                            build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
1015                            watermark3.clone(),
1016                        )]
1017                        .into()
1018                    ),
1019                    (
1020                        epoch5,
1021                        vec![VnodeWatermark::new(
1022                            build_bitmap(vec![0, 3, 4]),
1023                            watermark4.clone(),
1024                        )]
1025                        .into()
1026                    )
1027                ],
1028                direction,
1029                watermark_type,
1030            }
1031        );
1032
1033        table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch4);
1034        assert_eq!(
1035            table_watermarks_checkpoint,
1036            TableWatermarks {
1037                watermarks: vec![
1038                    (
1039                        epoch4,
1040                        vec![VnodeWatermark::new(
1041                            build_bitmap((1..3).chain(5..VirtualNode::COUNT_FOR_TEST)),
1042                            watermark3.clone()
1043                        )]
1044                        .into()
1045                    ),
1046                    (
1047                        epoch5,
1048                        vec![VnodeWatermark::new(
1049                            build_bitmap(vec![0, 3, 4]),
1050                            watermark4.clone(),
1051                        )]
1052                        .into()
1053                    )
1054                ],
1055                direction,
1056                watermark_type,
1057            }
1058        );
1059
1060        table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch5);
1061        assert_eq!(
1062            table_watermarks_checkpoint,
1063            TableWatermarks {
1064                watermarks: vec![(
1065                    epoch5,
1066                    vec![
1067                        VnodeWatermark::new(build_bitmap(vec![0, 3, 4]), watermark4.clone()),
1068                        VnodeWatermark::new(
1069                            build_bitmap((1..3).chain(5..VirtualNode::COUNT_FOR_TEST)),
1070                            watermark3.clone()
1071                        )
1072                    ]
1073                    .into()
1074                )],
1075                direction,
1076                watermark_type,
1077            }
1078        );
1079    }
1080
1081    #[test]
1082    fn test_merge_multiple_new_table_watermarks() {
1083        fn epoch_new_watermark(epoch: u64, bitmaps: Vec<&Bitmap>) -> (u64, Arc<[VnodeWatermark]>) {
1084            (
1085                epoch,
1086                bitmaps
1087                    .into_iter()
1088                    .map(|bitmap| VnodeWatermark {
1089                        watermark: Bytes::from(vec![1, 2, epoch as _]),
1090                        vnode_bitmap: Arc::new(bitmap.clone()),
1091                    })
1092                    .collect_vec()
1093                    .into(),
1094            )
1095        }
1096        fn build_table_watermark(
1097            vnodes: impl IntoIterator<Item = usize>,
1098            epochs: impl IntoIterator<Item = u64>,
1099        ) -> TableWatermarks {
1100            let bitmap = build_bitmap(vnodes);
1101            TableWatermarks {
1102                watermarks: epochs
1103                    .into_iter()
1104                    .map(|epoch: u64| epoch_new_watermark(epoch, vec![&bitmap]))
1105                    .collect(),
1106                direction: WatermarkDirection::Ascending,
1107                watermark_type: WatermarkSerdeType::PkPrefix,
1108            }
1109        }
1110        let table1_watermark1 = build_table_watermark(0..3, vec![1, 2, 4]);
1111        let table1_watermark2 = build_table_watermark(4..6, vec![1, 2, 5]);
1112        let table2_watermark = build_table_watermark(0..4, 1..3);
1113        let table3_watermark = build_table_watermark(0..4, 3..5);
1114        let mut first = HashMap::new();
1115        first.insert(TableId::new(1), table1_watermark1);
1116        first.insert(TableId::new(2), table2_watermark.clone());
1117        let mut second = HashMap::new();
1118        second.insert(TableId::new(1), table1_watermark2);
1119        second.insert(TableId::new(3), table3_watermark.clone());
1120        let result = merge_multiple_new_table_watermarks(vec![first, second]);
1121        let mut expected = HashMap::new();
1122        expected.insert(
1123            TableId::new(1),
1124            TableWatermarks {
1125                watermarks: vec![
1126                    epoch_new_watermark(1, vec![&build_bitmap(0..3), &build_bitmap(4..6)]),
1127                    epoch_new_watermark(2, vec![&build_bitmap(0..3), &build_bitmap(4..6)]),
1128                    epoch_new_watermark(4, vec![&build_bitmap(0..3)]),
1129                    epoch_new_watermark(5, vec![&build_bitmap(4..6)]),
1130                ],
1131                direction: WatermarkDirection::Ascending,
1132                watermark_type: WatermarkSerdeType::PkPrefix,
1133            },
1134        );
1135        expected.insert(TableId::new(2), table2_watermark);
1136        expected.insert(TableId::new(3), table3_watermark);
1137        assert_eq!(result, expected);
1138    }
1139
1140    const COMMITTED_EPOCH: u64 = test_epoch(1);
1141    const EPOCH1: u64 = test_epoch(2);
1142    const EPOCH2: u64 = test_epoch(3);
1143    const TEST_SINGLE_VNODE: VirtualNode = VirtualNode::from_index(1);
1144
1145    fn build_watermark_range(
1146        direction: WatermarkDirection,
1147        (low, high): (Bound<Bytes>, Bound<Bytes>),
1148    ) -> TableKeyRange {
1149        let range = match direction {
1150            WatermarkDirection::Ascending => (low, high),
1151            WatermarkDirection::Descending => (high, low),
1152        };
1153        prefixed_range_with_vnode(range, TEST_SINGLE_VNODE)
1154    }
1155
1156    /// Build and return a watermark index with the following watermarks
1157    /// EPOCH1 bitmap(0, 1, 2, 3) watermark1
1158    /// EPOCH2 bitmap(1, 2, 3, 4) watermark2
1159    fn build_and_test_watermark_index(
1160        direction: WatermarkDirection,
1161        watermark1: Bytes,
1162        watermark2: Bytes,
1163        watermark3: Bytes,
1164    ) -> PkPrefixTableWatermarksIndex {
1165        let mut index = PkPrefixTableWatermarksIndex::new(
1166            direction,
1167            EPOCH1,
1168            vec![VnodeWatermark::new(build_bitmap(0..4), watermark1.clone())],
1169            Some(COMMITTED_EPOCH),
1170        );
1171        index.add_epoch_watermark(
1172            EPOCH2,
1173            vec![VnodeWatermark::new(build_bitmap(1..5), watermark2.clone())].into(),
1174            direction,
1175        );
1176
1177        assert_eq!(
1178            index.read_watermark(VirtualNode::from_index(0), EPOCH1),
1179            Some(watermark1.clone())
1180        );
1181        assert_eq!(
1182            index.read_watermark(VirtualNode::from_index(1), EPOCH1),
1183            Some(watermark1.clone())
1184        );
1185        assert_eq!(
1186            index.read_watermark(VirtualNode::from_index(4), EPOCH1),
1187            None
1188        );
1189        assert_eq!(
1190            index.read_watermark(VirtualNode::from_index(0), EPOCH2),
1191            Some(watermark1.clone())
1192        );
1193        assert_eq!(
1194            index.read_watermark(VirtualNode::from_index(1), EPOCH2),
1195            Some(watermark2.clone())
1196        );
1197        assert_eq!(
1198            index.read_watermark(VirtualNode::from_index(4), EPOCH2),
1199            Some(watermark2.clone())
1200        );
1201        assert_eq!(
1202            index.latest_watermark(VirtualNode::from_index(0)),
1203            Some(watermark1.clone())
1204        );
1205        assert_eq!(
1206            index.latest_watermark(VirtualNode::from_index(1)),
1207            Some(watermark2.clone())
1208        );
1209        assert_eq!(
1210            index.latest_watermark(VirtualNode::from_index(4)),
1211            Some(watermark2.clone())
1212        );
1213
1214        // watermark is watermark2
1215        let check_watermark_range =
1216            |query_range: (Bound<Bytes>, Bound<Bytes>),
1217             output_range: Option<(Bound<Bytes>, Bound<Bytes>)>| {
1218                let mut range = build_watermark_range(direction, query_range);
1219                index.rewrite_range_with_table_watermark(EPOCH2, &mut range);
1220                if let Some(output_range) = output_range {
1221                    assert_eq!(range, build_watermark_range(direction, output_range));
1222                } else {
1223                    assert!(is_empty_key_range(&range));
1224                }
1225            };
1226
1227        // test read from single vnode and truncate begin key range
1228        check_watermark_range(
1229            (Included(watermark1.clone()), Excluded(watermark3.clone())),
1230            Some((Included(watermark2.clone()), Excluded(watermark3.clone()))),
1231        );
1232
1233        // test read from single vnode and begin key right at watermark
1234        check_watermark_range(
1235            (Included(watermark2.clone()), Excluded(watermark3.clone())),
1236            Some((Included(watermark2.clone()), Excluded(watermark3.clone()))),
1237        );
1238        check_watermark_range(
1239            (Excluded(watermark2.clone()), Excluded(watermark3.clone())),
1240            Some((Excluded(watermark2.clone()), Excluded(watermark3.clone()))),
1241        );
1242
1243        // test read from single vnode and end key right at watermark
1244        check_watermark_range(
1245            (Excluded(watermark1.clone()), Excluded(watermark2.clone())),
1246            None,
1247        );
1248        check_watermark_range(
1249            (Excluded(watermark1.clone()), Included(watermark2.clone())),
1250            Some((Included(watermark2.clone()), Included(watermark2.clone()))),
1251        );
1252
1253        index
1254    }
1255
1256    #[test]
1257    fn test_watermark_index_ascending() {
1258        let watermark1 = Bytes::from_static(b"watermark1");
1259        let watermark2 = Bytes::from_static(b"watermark2");
1260        let watermark3 = Bytes::from_static(b"watermark3");
1261        build_and_test_watermark_index(
1262            WatermarkDirection::Ascending,
1263            watermark1.clone(),
1264            watermark2.clone(),
1265            watermark3.clone(),
1266        );
1267    }
1268
1269    #[test]
1270    fn test_watermark_index_descending() {
1271        let watermark1 = Bytes::from_static(b"watermark254");
1272        let watermark2 = Bytes::from_static(b"watermark253");
1273        let watermark3 = Bytes::from_static(b"watermark252");
1274        build_and_test_watermark_index(
1275            WatermarkDirection::Descending,
1276            watermark1.clone(),
1277            watermark2.clone(),
1278            watermark3.clone(),
1279        );
1280    }
1281
1282    #[test]
1283    fn test_apply_committed_index() {
1284        let watermark1 = Bytes::from_static(b"watermark1");
1285        let watermark2 = Bytes::from_static(b"watermark2");
1286        let watermark3 = Bytes::from_static(b"watermark3");
1287        let mut index = build_and_test_watermark_index(
1288            WatermarkDirection::Ascending,
1289            watermark1.clone(),
1290            watermark2.clone(),
1291            watermark3.clone(),
1292        );
1293
1294        let test_table_id = TableId::from(233);
1295
1296        let mut version = HummockVersion::from_rpc_protobuf(&PbHummockVersion {
1297            state_table_info: HashMap::from_iter([(
1298                test_table_id.table_id,
1299                StateTableInfo {
1300                    committed_epoch: EPOCH1,
1301                    compaction_group_id: StaticCompactionGroupId::StateDefault as _,
1302                },
1303            )]),
1304            ..Default::default()
1305        });
1306        version.table_watermarks.insert(
1307            test_table_id,
1308            TableWatermarks {
1309                watermarks: vec![(
1310                    EPOCH1,
1311                    vec![VnodeWatermark {
1312                        watermark: watermark1.clone(),
1313                        vnode_bitmap: build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
1314                    }]
1315                    .into(),
1316                )],
1317                direction: WatermarkDirection::Ascending,
1318                watermark_type: WatermarkSerdeType::PkPrefix,
1319            }
1320            .into(),
1321        );
1322        index.apply_committed_watermarks(
1323            version
1324                .table_watermarks
1325                .get(&test_table_id)
1326                .unwrap()
1327                .clone(),
1328            EPOCH1,
1329        );
1330        assert_eq!(EPOCH1, index.committed_epoch.unwrap());
1331        assert_eq!(EPOCH2, index.latest_epoch);
1332        for vnode in 0..VirtualNode::COUNT_FOR_TEST {
1333            let vnode = VirtualNode::from_index(vnode);
1334            if (1..5).contains(&vnode.to_index()) {
1335                assert_eq!(watermark1, index.read_watermark(vnode, EPOCH1).unwrap());
1336                assert_eq!(watermark2, index.read_watermark(vnode, EPOCH2).unwrap());
1337            } else {
1338                assert_eq!(watermark1, index.read_watermark(vnode, EPOCH1).unwrap());
1339            }
1340        }
1341    }
1342
1343    #[test]
1344    fn test_filter_regress_watermark() {
1345        let watermark1 = Bytes::from_static(b"watermark1");
1346        let watermark2 = Bytes::from_static(b"watermark2");
1347        let watermark3 = Bytes::from_static(b"watermark3");
1348        let index = build_and_test_watermark_index(
1349            WatermarkDirection::Ascending,
1350            watermark1.clone(),
1351            watermark2.clone(),
1352            watermark3.clone(),
1353        );
1354
1355        let mut new_watermarks = vec![
1356            // Partial regress
1357            VnodeWatermark {
1358                vnode_bitmap: build_bitmap(0..2),
1359                watermark: watermark1.clone(),
1360            },
1361            // All not regress
1362            VnodeWatermark {
1363                vnode_bitmap: build_bitmap(2..4),
1364                watermark: watermark3.clone(),
1365            },
1366            // All regress
1367            VnodeWatermark {
1368                vnode_bitmap: build_bitmap(4..5),
1369                watermark: watermark1.clone(),
1370            },
1371            // All newly set vnode
1372            VnodeWatermark {
1373                vnode_bitmap: build_bitmap(5..6),
1374                watermark: watermark3.clone(),
1375            },
1376        ];
1377
1378        index.filter_regress_watermarks(&mut new_watermarks);
1379
1380        assert_eq!(
1381            new_watermarks,
1382            vec![
1383                VnodeWatermark {
1384                    vnode_bitmap: build_bitmap(0..1),
1385                    watermark: watermark1,
1386                },
1387                VnodeWatermark {
1388                    vnode_bitmap: build_bitmap(2..4),
1389                    watermark: watermark3.clone(),
1390                },
1391                VnodeWatermark {
1392                    vnode_bitmap: build_bitmap(5..6),
1393                    watermark: watermark3,
1394                },
1395            ]
1396        );
1397    }
1398}