risingwave_hummock_sdk/
table_watermark.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
17use std::fmt::Display;
18use std::mem::size_of;
19use std::ops::Bound::{Excluded, Included, Unbounded};
20use std::sync::Arc;
21
22use bytes::Bytes;
23use itertools::Itertools;
24use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
25use risingwave_common::catalog::TableId;
26use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
27use risingwave_common::types::ToDatumRef;
28use risingwave_common::util::sort_util::{OrderType, cmp_datum};
29use risingwave_common_estimate_size::EstimateSize;
30use risingwave_pb::hummock::table_watermarks::PbEpochNewWatermarks;
31use risingwave_pb::hummock::{PbVnodeWatermark, TableWatermarks as PbTableWatermarks};
32use tracing::{debug, warn};
33
34use crate::HummockEpoch;
35use crate::key::{TableKey, TableKeyRange, prefix_slice_with_vnode, vnode};
36
37#[derive(Clone)]
38pub struct ReadTableWatermark {
39    pub direction: WatermarkDirection,
40    pub vnode_watermarks: BTreeMap<VirtualNode, Bytes>,
41}
42
43#[derive(Clone)]
44pub struct TableWatermarksIndex {
45    pub watermark_direction: WatermarkDirection,
46    // later epoch at the back
47    pub staging_watermarks: VecDeque<(HummockEpoch, Arc<[VnodeWatermark]>)>,
48    pub committed_watermarks: Option<Arc<TableWatermarks>>,
49    latest_epoch: HummockEpoch,
50    committed_epoch: Option<HummockEpoch>,
51}
52
53impl TableWatermarksIndex {
54    pub fn new(
55        watermark_direction: WatermarkDirection,
56        first_epoch: HummockEpoch,
57        first_vnode_watermark: Vec<VnodeWatermark>,
58        committed_epoch: Option<HummockEpoch>,
59    ) -> Self {
60        if let Some(committed_epoch) = committed_epoch {
61            assert!(first_epoch > committed_epoch);
62        }
63        Self {
64            watermark_direction,
65            staging_watermarks: VecDeque::from_iter([(
66                first_epoch,
67                Arc::from(first_vnode_watermark),
68            )]),
69            committed_watermarks: None,
70            latest_epoch: first_epoch,
71            committed_epoch,
72        }
73    }
74
75    pub fn new_committed(
76        committed_watermarks: Arc<TableWatermarks>,
77        committed_epoch: HummockEpoch,
78    ) -> Self {
79        Self {
80            watermark_direction: committed_watermarks.direction,
81            staging_watermarks: VecDeque::new(),
82            committed_epoch: Some(committed_epoch),
83            latest_epoch: committed_epoch,
84            committed_watermarks: Some(committed_watermarks),
85        }
86    }
87
88    pub fn read_watermark(&self, vnode: VirtualNode, epoch: HummockEpoch) -> Option<Bytes> {
89        // iterate from new epoch to old epoch
90        for (watermark_epoch, vnode_watermark_list) in self.staging_watermarks.iter().rev().chain(
91            self.committed_watermarks
92                .iter()
93                .flat_map(|watermarks| watermarks.watermarks.iter().rev()),
94        ) {
95            if *watermark_epoch > epoch {
96                continue;
97            }
98            for vnode_watermark in vnode_watermark_list.as_ref() {
99                if vnode_watermark.vnode_bitmap.is_set(vnode.to_index()) {
100                    return Some(vnode_watermark.watermark.clone());
101                }
102            }
103        }
104        None
105    }
106
107    pub fn latest_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
108        self.read_watermark(vnode, HummockEpoch::MAX)
109    }
110
111    pub fn rewrite_range_with_table_watermark(
112        &self,
113        epoch: HummockEpoch,
114        key_range: &mut TableKeyRange,
115    ) {
116        let vnode = vnode(key_range);
117        if let Some(watermark) = self.read_watermark(vnode, epoch) {
118            match self.watermark_direction {
119                WatermarkDirection::Ascending => {
120                    let overwrite_start_key = match &key_range.0 {
121                        Included(start_key) | Excluded(start_key) => {
122                            start_key.key_part() < watermark
123                        }
124                        Unbounded => true,
125                    };
126                    if overwrite_start_key {
127                        let watermark_key = TableKey(prefix_slice_with_vnode(vnode, &watermark));
128                        let fully_filtered = match &key_range.1 {
129                            Included(end_key) => end_key < &watermark_key,
130                            Excluded(end_key) => end_key <= &watermark_key,
131                            Unbounded => false,
132                        };
133                        if fully_filtered {
134                            key_range.1 = Excluded(watermark_key.clone());
135                        }
136                        key_range.0 = Included(watermark_key);
137                    }
138                }
139                WatermarkDirection::Descending => {
140                    let overwrite_end_key = match &key_range.1 {
141                        Included(end_key) | Excluded(end_key) => end_key.key_part() > watermark,
142                        Unbounded => true,
143                    };
144                    if overwrite_end_key {
145                        let watermark_key = TableKey(prefix_slice_with_vnode(vnode, &watermark));
146                        let fully_filtered = match &key_range.0 {
147                            Included(start_key) => start_key > &watermark_key,
148                            Excluded(start_key) => start_key >= &watermark_key,
149                            Unbounded => false,
150                        };
151                        if fully_filtered {
152                            *key_range = (Included(watermark_key.clone()), Excluded(watermark_key));
153                        } else {
154                            key_range.1 = Included(watermark_key);
155                        }
156                    }
157                }
158            }
159        }
160    }
161
162    pub fn filter_regress_watermarks(&self, watermarks: &mut Vec<VnodeWatermark>) {
163        let mut ret = Vec::with_capacity(watermarks.len());
164        for watermark in watermarks.drain(..) {
165            let vnode_count = watermark.vnode_count();
166
167            let mut regress_vnodes = None;
168            for vnode in watermark.vnode_bitmap.iter_vnodes() {
169                if let Some(prev_watermark) = self.latest_watermark(vnode) {
170                    let is_regress = match self.direction() {
171                        WatermarkDirection::Ascending => prev_watermark > watermark.watermark,
172                        WatermarkDirection::Descending => prev_watermark < watermark.watermark,
173                    };
174                    if is_regress {
175                        warn!(
176                            "table watermark regress: {:?} {} {:?} {:?}",
177                            self.direction(),
178                            vnode,
179                            watermark.watermark,
180                            prev_watermark
181                        );
182                        regress_vnodes
183                            .get_or_insert_with(|| BitmapBuilder::zeroed(vnode_count))
184                            .set(vnode.to_index(), true);
185                    }
186                }
187            }
188            if let Some(regress_vnodes) = regress_vnodes {
189                let mut bitmap_builder = None;
190                for vnode in watermark.vnode_bitmap.iter_vnodes() {
191                    let vnode_index = vnode.to_index();
192                    if !regress_vnodes.is_set(vnode_index) {
193                        bitmap_builder
194                            .get_or_insert_with(|| BitmapBuilder::zeroed(vnode_count))
195                            .set(vnode_index, true);
196                    }
197                }
198                if let Some(bitmap_builder) = bitmap_builder {
199                    ret.push(VnodeWatermark::new(
200                        Arc::new(bitmap_builder.finish()),
201                        watermark.watermark,
202                    ));
203                }
204            } else {
205                // no vnode has regress watermark
206                ret.push(watermark);
207            }
208        }
209        *watermarks = ret;
210    }
211
212    pub fn direction(&self) -> WatermarkDirection {
213        self.watermark_direction
214    }
215
216    pub fn add_epoch_watermark(
217        &mut self,
218        epoch: HummockEpoch,
219        vnode_watermark_list: Arc<[VnodeWatermark]>,
220        direction: WatermarkDirection,
221    ) {
222        assert!(epoch > self.latest_epoch);
223        assert_eq!(self.watermark_direction, direction);
224        self.latest_epoch = epoch;
225        #[cfg(debug_assertions)]
226        if !vnode_watermark_list.is_empty() {
227            let vnode_count = vnode_watermark_list[0].vnode_count();
228            let mut vnode_is_set = BitmapBuilder::zeroed(vnode_count);
229            for vnode_watermark in vnode_watermark_list.as_ref() {
230                for vnode in vnode_watermark.vnode_bitmap.iter_ones() {
231                    assert!(!vnode_is_set.is_set(vnode));
232                    vnode_is_set.set(vnode, true);
233                    let vnode = VirtualNode::from_index(vnode);
234                    if let Some(prev_watermark) = self.latest_watermark(vnode) {
235                        match self.watermark_direction {
236                            WatermarkDirection::Ascending => {
237                                assert!(vnode_watermark.watermark >= prev_watermark);
238                            }
239                            WatermarkDirection::Descending => {
240                                assert!(vnode_watermark.watermark <= prev_watermark);
241                            }
242                        };
243                    }
244                }
245            }
246        }
247        self.staging_watermarks
248            .push_back((epoch, vnode_watermark_list));
249    }
250
251    pub fn apply_committed_watermarks(
252        &mut self,
253        committed_watermark: Arc<TableWatermarks>,
254        committed_epoch: HummockEpoch,
255    ) {
256        assert_eq!(self.watermark_direction, committed_watermark.direction);
257        if let Some(prev_committed_epoch) = self.committed_epoch {
258            assert!(prev_committed_epoch <= committed_epoch);
259            if prev_committed_epoch == committed_epoch {
260                return;
261            }
262        }
263        if self.latest_epoch < committed_epoch {
264            debug!(
265                latest_epoch = self.latest_epoch,
266                committed_epoch, "committed_epoch exceed table watermark latest_epoch"
267            );
268            self.latest_epoch = committed_epoch;
269        }
270        self.committed_epoch = Some(committed_epoch);
271        self.committed_watermarks = Some(committed_watermark);
272        // keep only watermark higher than committed epoch
273        while let Some((old_epoch, _)) = self.staging_watermarks.front()
274            && *old_epoch <= committed_epoch
275        {
276            let _ = self.staging_watermarks.pop_front();
277        }
278    }
279}
280
281#[derive(Debug, Clone, Copy, Eq, PartialEq)]
282pub enum WatermarkDirection {
283    Ascending,
284    Descending,
285}
286
287impl Display for WatermarkDirection {
288    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
289        match self {
290            WatermarkDirection::Ascending => write!(f, "Ascending"),
291            WatermarkDirection::Descending => write!(f, "Descending"),
292        }
293    }
294}
295
296impl WatermarkDirection {
297    pub fn key_filter_by_watermark(
298        &self,
299        key: impl AsRef<[u8]>,
300        watermark: impl AsRef<[u8]>,
301    ) -> bool {
302        let key = key.as_ref();
303        let watermark = watermark.as_ref();
304        match self {
305            WatermarkDirection::Ascending => key < watermark,
306            WatermarkDirection::Descending => key > watermark,
307        }
308    }
309
310    pub fn datum_filter_by_watermark(
311        &self,
312        watermark_col_in_pk: impl ToDatumRef,
313        watermark: impl ToDatumRef,
314        order_type: OrderType,
315    ) -> bool {
316        let watermark_col_in_pk = watermark_col_in_pk.to_datum_ref();
317        let watermark = watermark.to_datum_ref();
318        match self {
319            WatermarkDirection::Ascending => {
320                // watermark_col_in_pk < watermark
321                cmp_datum(watermark_col_in_pk, watermark, order_type).is_lt()
322            }
323            WatermarkDirection::Descending => {
324                //  watermark_col_in_pk > watermark
325                cmp_datum(watermark_col_in_pk, watermark, order_type).is_gt()
326            }
327        }
328    }
329
330    pub fn is_ascending(&self) -> bool {
331        match self {
332            WatermarkDirection::Ascending => true,
333            WatermarkDirection::Descending => false,
334        }
335    }
336}
337
338#[derive(Clone, Debug, PartialEq, EstimateSize)]
339pub struct VnodeWatermark {
340    vnode_bitmap: Arc<Bitmap>,
341    watermark: Bytes,
342}
343
344impl VnodeWatermark {
345    pub fn new(vnode_bitmap: Arc<Bitmap>, watermark: Bytes) -> Self {
346        Self {
347            vnode_bitmap,
348            watermark,
349        }
350    }
351
352    pub fn vnode_bitmap(&self) -> &Bitmap {
353        &self.vnode_bitmap
354    }
355
356    /// Vnode count derived from the bitmap.
357    pub fn vnode_count(&self) -> usize {
358        self.vnode_bitmap.len()
359    }
360
361    pub fn watermark(&self) -> &Bytes {
362        &self.watermark
363    }
364
365    pub fn to_protobuf(&self) -> PbVnodeWatermark {
366        self.into()
367    }
368}
369
370impl From<PbVnodeWatermark> for VnodeWatermark {
371    fn from(pb: PbVnodeWatermark) -> Self {
372        Self {
373            vnode_bitmap: Arc::new(Bitmap::from(pb.vnode_bitmap.as_ref().unwrap())),
374            watermark: Bytes::from(pb.watermark),
375        }
376    }
377}
378
379impl From<&PbVnodeWatermark> for VnodeWatermark {
380    fn from(pb: &PbVnodeWatermark) -> Self {
381        Self {
382            vnode_bitmap: Arc::new(Bitmap::from(pb.vnode_bitmap.as_ref().unwrap())),
383            watermark: Bytes::from(pb.watermark.clone()),
384        }
385    }
386}
387
388impl From<VnodeWatermark> for PbVnodeWatermark {
389    fn from(watermark: VnodeWatermark) -> Self {
390        Self {
391            watermark: watermark.watermark.into(),
392            vnode_bitmap: Some(watermark.vnode_bitmap.to_protobuf()),
393        }
394    }
395}
396
397impl From<&VnodeWatermark> for PbVnodeWatermark {
398    fn from(watermark: &VnodeWatermark) -> Self {
399        Self {
400            watermark: watermark.watermark.to_vec(),
401            vnode_bitmap: Some(watermark.vnode_bitmap.to_protobuf()),
402        }
403    }
404}
405
406#[derive(Clone, Debug, PartialEq)]
407pub struct TableWatermarks {
408    // later epoch at the back
409    pub watermarks: Vec<(HummockEpoch, Arc<[VnodeWatermark]>)>,
410    pub direction: WatermarkDirection,
411    pub watermark_type: WatermarkSerdeType,
412}
413
414impl TableWatermarks {
415    pub fn single_epoch(
416        epoch: HummockEpoch,
417        watermarks: Vec<VnodeWatermark>,
418        direction: WatermarkDirection,
419        watermark_type: WatermarkSerdeType,
420    ) -> Self {
421        let mut this = Self {
422            direction,
423            watermarks: Vec::new(),
424            watermark_type,
425        };
426        this.add_new_epoch_watermarks(epoch, watermarks.into(), direction, watermark_type);
427        this
428    }
429
430    pub fn add_new_epoch_watermarks(
431        &mut self,
432        epoch: HummockEpoch,
433        watermarks: Arc<[VnodeWatermark]>,
434        direction: WatermarkDirection,
435        watermark_type: WatermarkSerdeType,
436    ) {
437        assert_eq!(self.direction, direction);
438        assert_eq!(self.watermark_type, watermark_type);
439
440        if let Some((prev_epoch, _)) = self.watermarks.last() {
441            assert!(*prev_epoch < epoch);
442        }
443        if !watermarks.is_empty() {
444            let vnode_count = watermarks[0].vnode_count();
445            for watermark in &*watermarks {
446                assert_eq!(watermark.vnode_count(), vnode_count);
447            }
448            if let Some(existing_vnode_count) = self.vnode_count() {
449                assert_eq!(existing_vnode_count, vnode_count);
450            }
451        }
452        self.watermarks.push((epoch, watermarks));
453    }
454
455    /// Vnode count derived from existing watermarks. Returns `None` if there is no watermark.
456    fn vnode_count(&self) -> Option<usize> {
457        self.watermarks
458            .iter()
459            .flat_map(|(_, watermarks)| watermarks.as_ref())
460            .next()
461            .map(|w| w.vnode_count())
462    }
463
464    pub fn from_protobuf(pb: &PbTableWatermarks) -> Self {
465        Self {
466            watermarks: pb
467                .epoch_watermarks
468                .iter()
469                .map(|epoch_watermark| {
470                    let epoch = epoch_watermark.epoch;
471                    let watermarks = epoch_watermark
472                        .watermarks
473                        .iter()
474                        .map(VnodeWatermark::from)
475                        .collect_vec();
476                    (epoch, Arc::from(watermarks))
477                })
478                .collect(),
479            direction: if pb.is_ascending {
480                WatermarkDirection::Ascending
481            } else {
482                WatermarkDirection::Descending
483            },
484            watermark_type: if pb.is_non_pk_prefix {
485                WatermarkSerdeType::NonPkPrefix
486            } else {
487                WatermarkSerdeType::PkPrefix
488            },
489        }
490    }
491}
492
493pub fn merge_multiple_new_table_watermarks(
494    table_watermarks_list: impl IntoIterator<Item = HashMap<TableId, TableWatermarks>>,
495) -> HashMap<TableId, TableWatermarks> {
496    #[allow(clippy::type_complexity)]
497    let mut ret: HashMap<
498        TableId,
499        (
500            WatermarkDirection,
501            BTreeMap<u64, Vec<VnodeWatermark>>,
502            WatermarkSerdeType,
503        ),
504    > = HashMap::new();
505    for table_watermarks in table_watermarks_list {
506        for (table_id, new_table_watermarks) in table_watermarks {
507            let epoch_watermarks = match ret.entry(table_id) {
508                Entry::Occupied(entry) => {
509                    let (direction, epoch_watermarks, watermark_type) = entry.into_mut();
510                    assert_eq!(&new_table_watermarks.direction, direction);
511                    assert_eq!(&new_table_watermarks.watermark_type, watermark_type);
512                    epoch_watermarks
513                }
514                Entry::Vacant(entry) => {
515                    let (_, epoch_watermarks, _) = entry.insert((
516                        new_table_watermarks.direction,
517                        BTreeMap::new(),
518                        new_table_watermarks.watermark_type,
519                    ));
520                    epoch_watermarks
521                }
522            };
523            for (new_epoch, new_epoch_watermarks) in new_table_watermarks.watermarks {
524                epoch_watermarks
525                    .entry(new_epoch)
526                    .or_insert_with(Vec::new)
527                    .extend(new_epoch_watermarks.iter().cloned());
528            }
529        }
530    }
531    ret.into_iter()
532        .map(
533            |(table_id, (direction, epoch_watermarks, watermark_type))| {
534                (
535                    table_id,
536                    TableWatermarks {
537                        direction,
538                        // ordered from earlier epoch to later epoch
539                        watermarks: epoch_watermarks
540                            .into_iter()
541                            .map(|(epoch, watermarks)| (epoch, Arc::from(watermarks)))
542                            .collect(),
543                        watermark_type,
544                    },
545                )
546            },
547        )
548        .collect()
549}
550
551impl TableWatermarks {
552    pub fn apply_new_table_watermarks(&mut self, newly_added_watermarks: &TableWatermarks) {
553        assert_eq!(self.direction, newly_added_watermarks.direction);
554        assert!(self.watermarks.iter().map(|(epoch, _)| epoch).is_sorted());
555        assert!(
556            newly_added_watermarks
557                .watermarks
558                .iter()
559                .map(|(epoch, _)| epoch)
560                .is_sorted()
561        );
562        // ensure that the newly added watermarks have a later epoch than the previous latest epoch.
563        if let Some((prev_last_epoch, _)) = self.watermarks.last()
564            && let Some((new_first_epoch, _)) = newly_added_watermarks.watermarks.first()
565        {
566            assert!(prev_last_epoch < new_first_epoch);
567        }
568        self.watermarks.extend(
569            newly_added_watermarks
570                .watermarks
571                .iter()
572                .map(|(epoch, new_watermarks)| (*epoch, new_watermarks.clone())),
573        );
574    }
575
576    pub fn clear_stale_epoch_watermark(&mut self, safe_epoch: u64) {
577        match self.watermarks.first() {
578            None => {
579                // return on empty watermark
580                return;
581            }
582            Some((earliest_epoch, _)) => {
583                if *earliest_epoch >= safe_epoch {
584                    // No stale epoch watermark needs to be cleared.
585                    return;
586                }
587            }
588        }
589        debug!("clear stale table watermark below epoch {}", safe_epoch);
590        let mut result_epoch_watermark = Vec::with_capacity(self.watermarks.len());
591        let mut set_vnode: HashSet<VirtualNode> = HashSet::new();
592        let mut vnode_count: Option<usize> = None; // lazy initialized on first occurrence of vnode watermark
593        while let Some((epoch, _)) = self.watermarks.last() {
594            if *epoch >= safe_epoch {
595                let (epoch, watermarks) = self.watermarks.pop().expect("have check Some");
596                for watermark in watermarks.as_ref() {
597                    vnode_count.get_or_insert_with(|| watermark.vnode_count());
598                    for vnode in watermark.vnode_bitmap.iter_vnodes() {
599                        set_vnode.insert(vnode);
600                    }
601                }
602                result_epoch_watermark.push((epoch, watermarks));
603            } else {
604                break;
605            }
606        }
607        while vnode_count != Some(set_vnode.len())
608            && let Some((_, watermarks)) = self.watermarks.pop()
609        {
610            let mut new_vnode_watermarks = Vec::new();
611            for vnode_watermark in watermarks.as_ref() {
612                let mut new_set_vnode = Vec::new();
613                vnode_count.get_or_insert_with(|| vnode_watermark.vnode_count());
614                for vnode in vnode_watermark.vnode_bitmap.iter_vnodes() {
615                    if set_vnode.insert(vnode) {
616                        new_set_vnode.push(vnode);
617                    }
618                }
619                if !new_set_vnode.is_empty() {
620                    let mut builder = BitmapBuilder::zeroed(vnode_watermark.vnode_count());
621                    for vnode in new_set_vnode {
622                        builder.set(vnode.to_index(), true);
623                    }
624                    let bitmap = Arc::new(builder.finish());
625                    new_vnode_watermarks.push(VnodeWatermark {
626                        vnode_bitmap: bitmap,
627                        watermark: vnode_watermark.watermark.clone(),
628                    })
629                }
630            }
631            if !new_vnode_watermarks.is_empty() {
632                if let Some((last_epoch, last_watermarks)) = result_epoch_watermark.last_mut()
633                    && *last_epoch == safe_epoch
634                {
635                    *last_watermarks = Arc::from(
636                        last_watermarks
637                            .iter()
638                            .cloned()
639                            .chain(new_vnode_watermarks.into_iter())
640                            .collect_vec(),
641                    );
642                } else {
643                    result_epoch_watermark.push((safe_epoch, Arc::from(new_vnode_watermarks)));
644                }
645            }
646        }
647        // epoch watermark are added from later epoch to earlier epoch.
648        // reverse to ensure that earlier epochs are at the front
649        result_epoch_watermark.reverse();
650        assert!(
651            result_epoch_watermark
652                .is_sorted_by(|(first_epoch, _), (second_epoch, _)| { first_epoch < second_epoch })
653        );
654        *self = TableWatermarks {
655            watermarks: result_epoch_watermark,
656            direction: self.direction,
657            watermark_type: self.watermark_type,
658        }
659    }
660}
661
662impl TableWatermarks {
663    pub fn estimated_encode_len(&self) -> usize {
664        self.watermarks.len() * size_of::<HummockEpoch>()
665            + self
666                .watermarks
667                .iter()
668                .map(|(_, watermarks)| {
669                    watermarks
670                        .iter()
671                        .map(|watermark| watermark.estimated_size())
672                        .sum::<usize>()
673                })
674                .sum::<usize>()
675            + size_of::<bool>() // for direction
676    }
677
678    pub fn to_protobuf(&self) -> PbTableWatermarks {
679        self.into()
680    }
681}
682
683impl From<&PbTableWatermarks> for TableWatermarks {
684    fn from(pb: &PbTableWatermarks) -> Self {
685        Self {
686            watermarks: pb
687                .epoch_watermarks
688                .iter()
689                .map(|epoch_watermark| {
690                    let epoch = epoch_watermark.epoch;
691                    let watermarks = epoch_watermark
692                        .watermarks
693                        .iter()
694                        .map(VnodeWatermark::from)
695                        .collect();
696                    (epoch, watermarks)
697                })
698                .collect(),
699            direction: if pb.is_ascending {
700                WatermarkDirection::Ascending
701            } else {
702                WatermarkDirection::Descending
703            },
704            watermark_type: if pb.is_non_pk_prefix {
705                WatermarkSerdeType::NonPkPrefix
706            } else {
707                WatermarkSerdeType::PkPrefix
708            },
709        }
710    }
711}
712
713impl From<&TableWatermarks> for PbTableWatermarks {
714    fn from(table_watermarks: &TableWatermarks) -> Self {
715        Self {
716            epoch_watermarks: table_watermarks
717                .watermarks
718                .iter()
719                .map(|(epoch, watermarks)| PbEpochNewWatermarks {
720                    watermarks: watermarks.iter().map(|wm| wm.into()).collect(),
721                    epoch: *epoch,
722                })
723                .collect(),
724            is_ascending: match table_watermarks.direction {
725                WatermarkDirection::Ascending => true,
726                WatermarkDirection::Descending => false,
727            },
728            is_non_pk_prefix: match table_watermarks.watermark_type {
729                WatermarkSerdeType::NonPkPrefix => true,
730                WatermarkSerdeType::PkPrefix => false,
731            },
732        }
733    }
734}
735
736impl From<PbTableWatermarks> for TableWatermarks {
737    fn from(pb: PbTableWatermarks) -> Self {
738        Self {
739            watermarks: pb
740                .epoch_watermarks
741                .into_iter()
742                .map(|epoch_watermark| {
743                    let epoch = epoch_watermark.epoch;
744                    let watermarks = epoch_watermark
745                        .watermarks
746                        .into_iter()
747                        .map(VnodeWatermark::from)
748                        .collect();
749                    (epoch, watermarks)
750                })
751                .collect(),
752            direction: if pb.is_ascending {
753                WatermarkDirection::Ascending
754            } else {
755                WatermarkDirection::Descending
756            },
757            watermark_type: if pb.is_non_pk_prefix {
758                WatermarkSerdeType::NonPkPrefix
759            } else {
760                WatermarkSerdeType::PkPrefix
761            },
762        }
763    }
764}
765
766impl From<TableWatermarks> for PbTableWatermarks {
767    fn from(table_watermarks: TableWatermarks) -> Self {
768        Self {
769            epoch_watermarks: table_watermarks
770                .watermarks
771                .into_iter()
772                .map(|(epoch, watermarks)| PbEpochNewWatermarks {
773                    watermarks: watermarks.iter().map(PbVnodeWatermark::from).collect(),
774                    epoch,
775                })
776                .collect(),
777            is_ascending: match table_watermarks.direction {
778                WatermarkDirection::Ascending => true,
779                WatermarkDirection::Descending => false,
780            },
781            is_non_pk_prefix: match table_watermarks.watermark_type {
782                WatermarkSerdeType::NonPkPrefix => true,
783                WatermarkSerdeType::PkPrefix => false,
784            },
785        }
786    }
787}
788
789#[derive(Debug, Clone, Copy, PartialEq, Eq)]
790pub enum WatermarkSerdeType {
791    PkPrefix,
792    NonPkPrefix,
793}
794
795#[cfg(test)]
796mod tests {
797    use std::collections::Bound::Included;
798    use std::collections::{Bound, HashMap};
799    use std::ops::Bound::Excluded;
800    use std::sync::Arc;
801    use std::vec;
802
803    use bytes::Bytes;
804    use itertools::Itertools;
805    use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
806    use risingwave_common::catalog::TableId;
807    use risingwave_common::hash::VirtualNode;
808    use risingwave_common::util::epoch::{EpochExt, test_epoch};
809    use risingwave_pb::hummock::{PbHummockVersion, StateTableInfo};
810
811    use crate::compaction_group::StaticCompactionGroupId;
812    use crate::key::{TableKeyRange, is_empty_key_range, prefixed_range_with_vnode};
813    use crate::table_watermark::{
814        TableWatermarks, TableWatermarksIndex, VnodeWatermark, WatermarkDirection,
815        WatermarkSerdeType, merge_multiple_new_table_watermarks,
816    };
817    use crate::version::HummockVersion;
818
819    fn build_bitmap(vnodes: impl IntoIterator<Item = usize>) -> Arc<Bitmap> {
820        let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
821        for vnode in vnodes {
822            builder.set(vnode, true);
823        }
824        Arc::new(builder.finish())
825    }
826
827    #[test]
828    fn test_apply_new_table_watermark() {
829        let epoch1 = test_epoch(1);
830        let direction = WatermarkDirection::Ascending;
831        let watermark1 = Bytes::from("watermark1");
832        let watermark2 = Bytes::from("watermark2");
833        let watermark3 = Bytes::from("watermark3");
834        let watermark4 = Bytes::from("watermark4");
835        let watermark_type = WatermarkSerdeType::PkPrefix;
836        let mut table_watermarks = TableWatermarks::single_epoch(
837            epoch1,
838            vec![VnodeWatermark::new(
839                build_bitmap(vec![0, 1, 2]),
840                watermark1.clone(),
841            )],
842            direction,
843            watermark_type,
844        );
845        let epoch2 = epoch1.next_epoch();
846        table_watermarks.add_new_epoch_watermarks(
847            epoch2,
848            vec![VnodeWatermark::new(
849                build_bitmap(vec![0, 1, 2, 3]),
850                watermark2.clone(),
851            )]
852            .into(),
853            direction,
854            watermark_type,
855        );
856
857        let mut table_watermark_checkpoint = table_watermarks.clone();
858
859        let epoch3 = epoch2.next_epoch();
860        let mut second_table_watermark = TableWatermarks::single_epoch(
861            epoch3,
862            vec![VnodeWatermark::new(
863                build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
864                watermark3.clone(),
865            )],
866            direction,
867            watermark_type,
868        );
869        table_watermarks.add_new_epoch_watermarks(
870            epoch3,
871            vec![VnodeWatermark::new(
872                build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
873                watermark3.clone(),
874            )]
875            .into(),
876            direction,
877            watermark_type,
878        );
879        let epoch4 = epoch3.next_epoch();
880        let epoch5 = epoch4.next_epoch();
881        table_watermarks.add_new_epoch_watermarks(
882            epoch5,
883            vec![VnodeWatermark::new(
884                build_bitmap(vec![0, 3, 4]),
885                watermark4.clone(),
886            )]
887            .into(),
888            direction,
889            watermark_type,
890        );
891        second_table_watermark.add_new_epoch_watermarks(
892            epoch5,
893            vec![VnodeWatermark::new(
894                build_bitmap(vec![0, 3, 4]),
895                watermark4.clone(),
896            )]
897            .into(),
898            direction,
899            watermark_type,
900        );
901
902        table_watermark_checkpoint.apply_new_table_watermarks(&second_table_watermark);
903        assert_eq!(table_watermarks, table_watermark_checkpoint);
904    }
905
906    #[test]
907    fn test_clear_stale_epoch_watmermark() {
908        let epoch1 = test_epoch(1);
909        let direction = WatermarkDirection::Ascending;
910        let watermark1 = Bytes::from("watermark1");
911        let watermark2 = Bytes::from("watermark2");
912        let watermark3 = Bytes::from("watermark3");
913        let watermark4 = Bytes::from("watermark4");
914        let watermark_type = WatermarkSerdeType::PkPrefix;
915        let mut table_watermarks = TableWatermarks::single_epoch(
916            epoch1,
917            vec![VnodeWatermark::new(
918                build_bitmap(vec![0, 1, 2]),
919                watermark1.clone(),
920            )],
921            direction,
922            watermark_type,
923        );
924        let epoch2 = epoch1.next_epoch();
925        table_watermarks.add_new_epoch_watermarks(
926            epoch2,
927            vec![VnodeWatermark::new(
928                build_bitmap(vec![0, 1, 2, 3]),
929                watermark2.clone(),
930            )]
931            .into(),
932            direction,
933            watermark_type,
934        );
935        let epoch3 = epoch2.next_epoch();
936        table_watermarks.add_new_epoch_watermarks(
937            epoch3,
938            vec![VnodeWatermark::new(
939                build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
940                watermark3.clone(),
941            )]
942            .into(),
943            direction,
944            watermark_type,
945        );
946        let epoch4 = epoch3.next_epoch();
947        let epoch5 = epoch4.next_epoch();
948        table_watermarks.add_new_epoch_watermarks(
949            epoch5,
950            vec![VnodeWatermark::new(
951                build_bitmap(vec![0, 3, 4]),
952                watermark4.clone(),
953            )]
954            .into(),
955            direction,
956            watermark_type,
957        );
958
959        let mut table_watermarks_checkpoint = table_watermarks.clone();
960        table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch1);
961        assert_eq!(table_watermarks_checkpoint, table_watermarks);
962
963        table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch2);
964        assert_eq!(
965            table_watermarks_checkpoint,
966            TableWatermarks {
967                watermarks: vec![
968                    (
969                        epoch2,
970                        vec![VnodeWatermark::new(
971                            build_bitmap(vec![0, 1, 2, 3]),
972                            watermark2.clone(),
973                        )]
974                        .into()
975                    ),
976                    (
977                        epoch3,
978                        vec![VnodeWatermark::new(
979                            build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
980                            watermark3.clone(),
981                        )]
982                        .into()
983                    ),
984                    (
985                        epoch5,
986                        vec![VnodeWatermark::new(
987                            build_bitmap(vec![0, 3, 4]),
988                            watermark4.clone(),
989                        )]
990                        .into()
991                    )
992                ],
993                direction,
994                watermark_type,
995            }
996        );
997
998        table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch3);
999        assert_eq!(
1000            table_watermarks_checkpoint,
1001            TableWatermarks {
1002                watermarks: vec![
1003                    (
1004                        epoch3,
1005                        vec![VnodeWatermark::new(
1006                            build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
1007                            watermark3.clone(),
1008                        )]
1009                        .into()
1010                    ),
1011                    (
1012                        epoch5,
1013                        vec![VnodeWatermark::new(
1014                            build_bitmap(vec![0, 3, 4]),
1015                            watermark4.clone(),
1016                        )]
1017                        .into()
1018                    )
1019                ],
1020                direction,
1021                watermark_type,
1022            }
1023        );
1024
1025        table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch4);
1026        assert_eq!(
1027            table_watermarks_checkpoint,
1028            TableWatermarks {
1029                watermarks: vec![
1030                    (
1031                        epoch4,
1032                        vec![VnodeWatermark::new(
1033                            build_bitmap((1..3).chain(5..VirtualNode::COUNT_FOR_TEST)),
1034                            watermark3.clone()
1035                        )]
1036                        .into()
1037                    ),
1038                    (
1039                        epoch5,
1040                        vec![VnodeWatermark::new(
1041                            build_bitmap(vec![0, 3, 4]),
1042                            watermark4.clone(),
1043                        )]
1044                        .into()
1045                    )
1046                ],
1047                direction,
1048                watermark_type,
1049            }
1050        );
1051
1052        table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch5);
1053        assert_eq!(
1054            table_watermarks_checkpoint,
1055            TableWatermarks {
1056                watermarks: vec![(
1057                    epoch5,
1058                    vec![
1059                        VnodeWatermark::new(build_bitmap(vec![0, 3, 4]), watermark4.clone()),
1060                        VnodeWatermark::new(
1061                            build_bitmap((1..3).chain(5..VirtualNode::COUNT_FOR_TEST)),
1062                            watermark3.clone()
1063                        )
1064                    ]
1065                    .into()
1066                )],
1067                direction,
1068                watermark_type,
1069            }
1070        );
1071    }
1072
1073    #[test]
1074    fn test_merge_multiple_new_table_watermarks() {
1075        fn epoch_new_watermark(epoch: u64, bitmaps: Vec<&Bitmap>) -> (u64, Arc<[VnodeWatermark]>) {
1076            (
1077                epoch,
1078                bitmaps
1079                    .into_iter()
1080                    .map(|bitmap| VnodeWatermark {
1081                        watermark: Bytes::from(vec![1, 2, epoch as _]),
1082                        vnode_bitmap: Arc::new(bitmap.clone()),
1083                    })
1084                    .collect_vec()
1085                    .into(),
1086            )
1087        }
1088        fn build_table_watermark(
1089            vnodes: impl IntoIterator<Item = usize>,
1090            epochs: impl IntoIterator<Item = u64>,
1091        ) -> TableWatermarks {
1092            let bitmap = build_bitmap(vnodes);
1093            TableWatermarks {
1094                watermarks: epochs
1095                    .into_iter()
1096                    .map(|epoch: u64| epoch_new_watermark(epoch, vec![&bitmap]))
1097                    .collect(),
1098                direction: WatermarkDirection::Ascending,
1099                watermark_type: WatermarkSerdeType::PkPrefix,
1100            }
1101        }
1102        let table1_watermark1 = build_table_watermark(0..3, vec![1, 2, 4]);
1103        let table1_watermark2 = build_table_watermark(4..6, vec![1, 2, 5]);
1104        let table2_watermark = build_table_watermark(0..4, 1..3);
1105        let table3_watermark = build_table_watermark(0..4, 3..5);
1106        let mut first = HashMap::new();
1107        first.insert(TableId::new(1), table1_watermark1);
1108        first.insert(TableId::new(2), table2_watermark.clone());
1109        let mut second = HashMap::new();
1110        second.insert(TableId::new(1), table1_watermark2);
1111        second.insert(TableId::new(3), table3_watermark.clone());
1112        let result = merge_multiple_new_table_watermarks(vec![first, second]);
1113        let mut expected = HashMap::new();
1114        expected.insert(
1115            TableId::new(1),
1116            TableWatermarks {
1117                watermarks: vec![
1118                    epoch_new_watermark(1, vec![&build_bitmap(0..3), &build_bitmap(4..6)]),
1119                    epoch_new_watermark(2, vec![&build_bitmap(0..3), &build_bitmap(4..6)]),
1120                    epoch_new_watermark(4, vec![&build_bitmap(0..3)]),
1121                    epoch_new_watermark(5, vec![&build_bitmap(4..6)]),
1122                ],
1123                direction: WatermarkDirection::Ascending,
1124                watermark_type: WatermarkSerdeType::PkPrefix,
1125            },
1126        );
1127        expected.insert(TableId::new(2), table2_watermark);
1128        expected.insert(TableId::new(3), table3_watermark);
1129        assert_eq!(result, expected);
1130    }
1131
1132    const COMMITTED_EPOCH: u64 = test_epoch(1);
1133    const EPOCH1: u64 = test_epoch(2);
1134    const EPOCH2: u64 = test_epoch(3);
1135    const TEST_SINGLE_VNODE: VirtualNode = VirtualNode::from_index(1);
1136
1137    fn build_watermark_range(
1138        direction: WatermarkDirection,
1139        (low, high): (Bound<Bytes>, Bound<Bytes>),
1140    ) -> TableKeyRange {
1141        let range = match direction {
1142            WatermarkDirection::Ascending => (low, high),
1143            WatermarkDirection::Descending => (high, low),
1144        };
1145        prefixed_range_with_vnode(range, TEST_SINGLE_VNODE)
1146    }
1147
1148    /// Build and return a watermark index with the following watermarks
1149    /// EPOCH1 bitmap(0, 1, 2, 3) watermark1
1150    /// EPOCH2 bitmap(1, 2, 3, 4) watermark2
1151    fn build_and_test_watermark_index(
1152        direction: WatermarkDirection,
1153        watermark1: Bytes,
1154        watermark2: Bytes,
1155        watermark3: Bytes,
1156    ) -> TableWatermarksIndex {
1157        let mut index = TableWatermarksIndex::new(
1158            direction,
1159            EPOCH1,
1160            vec![VnodeWatermark::new(build_bitmap(0..4), watermark1.clone())],
1161            Some(COMMITTED_EPOCH),
1162        );
1163        index.add_epoch_watermark(
1164            EPOCH2,
1165            vec![VnodeWatermark::new(build_bitmap(1..5), watermark2.clone())].into(),
1166            direction,
1167        );
1168
1169        assert_eq!(
1170            index.read_watermark(VirtualNode::from_index(0), EPOCH1),
1171            Some(watermark1.clone())
1172        );
1173        assert_eq!(
1174            index.read_watermark(VirtualNode::from_index(1), EPOCH1),
1175            Some(watermark1.clone())
1176        );
1177        assert_eq!(
1178            index.read_watermark(VirtualNode::from_index(4), EPOCH1),
1179            None
1180        );
1181        assert_eq!(
1182            index.read_watermark(VirtualNode::from_index(0), EPOCH2),
1183            Some(watermark1.clone())
1184        );
1185        assert_eq!(
1186            index.read_watermark(VirtualNode::from_index(1), EPOCH2),
1187            Some(watermark2.clone())
1188        );
1189        assert_eq!(
1190            index.read_watermark(VirtualNode::from_index(4), EPOCH2),
1191            Some(watermark2.clone())
1192        );
1193        assert_eq!(
1194            index.latest_watermark(VirtualNode::from_index(0)),
1195            Some(watermark1.clone())
1196        );
1197        assert_eq!(
1198            index.latest_watermark(VirtualNode::from_index(1)),
1199            Some(watermark2.clone())
1200        );
1201        assert_eq!(
1202            index.latest_watermark(VirtualNode::from_index(4)),
1203            Some(watermark2.clone())
1204        );
1205
1206        // watermark is watermark2
1207        let check_watermark_range =
1208            |query_range: (Bound<Bytes>, Bound<Bytes>),
1209             output_range: Option<(Bound<Bytes>, Bound<Bytes>)>| {
1210                let mut range = build_watermark_range(direction, query_range);
1211                index.rewrite_range_with_table_watermark(EPOCH2, &mut range);
1212                if let Some(output_range) = output_range {
1213                    assert_eq!(range, build_watermark_range(direction, output_range));
1214                } else {
1215                    assert!(is_empty_key_range(&range));
1216                }
1217            };
1218
1219        // test read from single vnode and truncate begin key range
1220        check_watermark_range(
1221            (Included(watermark1.clone()), Excluded(watermark3.clone())),
1222            Some((Included(watermark2.clone()), Excluded(watermark3.clone()))),
1223        );
1224
1225        // test read from single vnode and begin key right at watermark
1226        check_watermark_range(
1227            (Included(watermark2.clone()), Excluded(watermark3.clone())),
1228            Some((Included(watermark2.clone()), Excluded(watermark3.clone()))),
1229        );
1230        check_watermark_range(
1231            (Excluded(watermark2.clone()), Excluded(watermark3.clone())),
1232            Some((Excluded(watermark2.clone()), Excluded(watermark3.clone()))),
1233        );
1234
1235        // test read from single vnode and end key right at watermark
1236        check_watermark_range(
1237            (Excluded(watermark1.clone()), Excluded(watermark2.clone())),
1238            None,
1239        );
1240        check_watermark_range(
1241            (Excluded(watermark1.clone()), Included(watermark2.clone())),
1242            Some((Included(watermark2.clone()), Included(watermark2.clone()))),
1243        );
1244
1245        index
1246    }
1247
1248    #[test]
1249    fn test_watermark_index_ascending() {
1250        let watermark1 = Bytes::from_static(b"watermark1");
1251        let watermark2 = Bytes::from_static(b"watermark2");
1252        let watermark3 = Bytes::from_static(b"watermark3");
1253        build_and_test_watermark_index(
1254            WatermarkDirection::Ascending,
1255            watermark1.clone(),
1256            watermark2.clone(),
1257            watermark3.clone(),
1258        );
1259    }
1260
1261    #[test]
1262    fn test_watermark_index_descending() {
1263        let watermark1 = Bytes::from_static(b"watermark254");
1264        let watermark2 = Bytes::from_static(b"watermark253");
1265        let watermark3 = Bytes::from_static(b"watermark252");
1266        build_and_test_watermark_index(
1267            WatermarkDirection::Descending,
1268            watermark1.clone(),
1269            watermark2.clone(),
1270            watermark3.clone(),
1271        );
1272    }
1273
1274    #[test]
1275    fn test_apply_committed_index() {
1276        let watermark1 = Bytes::from_static(b"watermark1");
1277        let watermark2 = Bytes::from_static(b"watermark2");
1278        let watermark3 = Bytes::from_static(b"watermark3");
1279        let mut index = build_and_test_watermark_index(
1280            WatermarkDirection::Ascending,
1281            watermark1.clone(),
1282            watermark2.clone(),
1283            watermark3.clone(),
1284        );
1285
1286        let test_table_id = TableId::from(233);
1287
1288        let mut version = HummockVersion::from_rpc_protobuf(&PbHummockVersion {
1289            state_table_info: HashMap::from_iter([(
1290                test_table_id.table_id,
1291                StateTableInfo {
1292                    committed_epoch: EPOCH1,
1293                    compaction_group_id: StaticCompactionGroupId::StateDefault as _,
1294                },
1295            )]),
1296            ..Default::default()
1297        });
1298        version.table_watermarks.insert(
1299            test_table_id,
1300            TableWatermarks {
1301                watermarks: vec![(
1302                    EPOCH1,
1303                    vec![VnodeWatermark {
1304                        watermark: watermark1.clone(),
1305                        vnode_bitmap: build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
1306                    }]
1307                    .into(),
1308                )],
1309                direction: WatermarkDirection::Ascending,
1310                watermark_type: WatermarkSerdeType::PkPrefix,
1311            }
1312            .into(),
1313        );
1314        index.apply_committed_watermarks(
1315            version
1316                .table_watermarks
1317                .get(&test_table_id)
1318                .unwrap()
1319                .clone(),
1320            EPOCH1,
1321        );
1322        assert_eq!(EPOCH1, index.committed_epoch.unwrap());
1323        assert_eq!(EPOCH2, index.latest_epoch);
1324        for vnode in 0..VirtualNode::COUNT_FOR_TEST {
1325            let vnode = VirtualNode::from_index(vnode);
1326            if (1..5).contains(&vnode.to_index()) {
1327                assert_eq!(watermark1, index.read_watermark(vnode, EPOCH1).unwrap());
1328                assert_eq!(watermark2, index.read_watermark(vnode, EPOCH2).unwrap());
1329            } else {
1330                assert_eq!(watermark1, index.read_watermark(vnode, EPOCH1).unwrap());
1331            }
1332        }
1333    }
1334
1335    #[test]
1336    fn test_filter_regress_watermark() {
1337        let watermark1 = Bytes::from_static(b"watermark1");
1338        let watermark2 = Bytes::from_static(b"watermark2");
1339        let watermark3 = Bytes::from_static(b"watermark3");
1340        let index = build_and_test_watermark_index(
1341            WatermarkDirection::Ascending,
1342            watermark1.clone(),
1343            watermark2.clone(),
1344            watermark3.clone(),
1345        );
1346
1347        let mut new_watermarks = vec![
1348            // Partial regress
1349            VnodeWatermark {
1350                vnode_bitmap: build_bitmap(0..2),
1351                watermark: watermark1.clone(),
1352            },
1353            // All not regress
1354            VnodeWatermark {
1355                vnode_bitmap: build_bitmap(2..4),
1356                watermark: watermark3.clone(),
1357            },
1358            // All regress
1359            VnodeWatermark {
1360                vnode_bitmap: build_bitmap(4..5),
1361                watermark: watermark1.clone(),
1362            },
1363            // All newly set vnode
1364            VnodeWatermark {
1365                vnode_bitmap: build_bitmap(5..6),
1366                watermark: watermark3.clone(),
1367            },
1368        ];
1369
1370        index.filter_regress_watermarks(&mut new_watermarks);
1371
1372        assert_eq!(
1373            new_watermarks,
1374            vec![
1375                VnodeWatermark {
1376                    vnode_bitmap: build_bitmap(0..1),
1377                    watermark: watermark1,
1378                },
1379                VnodeWatermark {
1380                    vnode_bitmap: build_bitmap(2..4),
1381                    watermark: watermark3.clone(),
1382                },
1383                VnodeWatermark {
1384                    vnode_bitmap: build_bitmap(5..6),
1385                    watermark: watermark3,
1386                },
1387            ]
1388        );
1389    }
1390}