risingwave_hummock_sdk/
table_watermark.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
17use std::fmt::Display;
18use std::mem::size_of;
19use std::ops::Bound::{Excluded, Included, Unbounded};
20use std::sync::Arc;
21
22use bytes::Bytes;
23use itertools::Itertools;
24use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
25use risingwave_common::catalog::TableId;
26use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
27use risingwave_common::types::ToDatumRef;
28use risingwave_common::util::sort_util::{OrderType, cmp_datum};
29use risingwave_common_estimate_size::EstimateSize;
30use risingwave_pb::hummock::table_watermarks::PbEpochNewWatermarks;
31use risingwave_pb::hummock::{PbVnodeWatermark, TableWatermarks as PbTableWatermarks};
32use tracing::{debug, warn};
33
34use crate::HummockEpoch;
35use crate::key::{TableKey, TableKeyRange, prefix_slice_with_vnode, vnode};
36
37#[derive(Clone)]
38pub struct ReadTableWatermark {
39    pub direction: WatermarkDirection,
40    pub vnode_watermarks: BTreeMap<VirtualNode, Bytes>,
41}
42
43#[derive(Clone)]
44pub struct PkPrefixTableWatermarksIndex {
45    pub watermark_direction: WatermarkDirection,
46    // later epoch at the back
47    pub staging_watermarks: VecDeque<(HummockEpoch, Arc<[VnodeWatermark]>)>,
48    pub committed_watermarks: Option<Arc<TableWatermarks>>,
49    latest_epoch: HummockEpoch,
50    committed_epoch: Option<HummockEpoch>,
51}
52
53impl PkPrefixTableWatermarksIndex {
54    pub fn new(
55        watermark_direction: WatermarkDirection,
56        first_epoch: HummockEpoch,
57        first_vnode_watermark: Vec<VnodeWatermark>,
58        committed_epoch: Option<HummockEpoch>,
59    ) -> Self {
60        if let Some(committed_epoch) = committed_epoch {
61            assert!(first_epoch > committed_epoch);
62        }
63        Self {
64            watermark_direction,
65            staging_watermarks: VecDeque::from_iter([(
66                first_epoch,
67                Arc::from(first_vnode_watermark),
68            )]),
69            committed_watermarks: None,
70            latest_epoch: first_epoch,
71            committed_epoch,
72        }
73    }
74
75    pub fn new_committed(
76        committed_watermarks: Arc<TableWatermarks>,
77        committed_epoch: HummockEpoch,
78    ) -> Self {
79        assert_eq!(
80            committed_watermarks.watermark_type,
81            WatermarkSerdeType::PkPrefix
82        );
83        Self {
84            watermark_direction: committed_watermarks.direction,
85            staging_watermarks: VecDeque::new(),
86            committed_epoch: Some(committed_epoch),
87            latest_epoch: committed_epoch,
88            committed_watermarks: Some(committed_watermarks),
89        }
90    }
91
92    pub fn read_watermark(&self, vnode: VirtualNode, epoch: HummockEpoch) -> Option<Bytes> {
93        // iterate from new epoch to old epoch
94        for (watermark_epoch, vnode_watermark_list) in self.staging_watermarks.iter().rev().chain(
95            self.committed_watermarks
96                .iter()
97                .flat_map(|watermarks| watermarks.watermarks.iter().rev()),
98        ) {
99            if *watermark_epoch > epoch {
100                continue;
101            }
102            for vnode_watermark in vnode_watermark_list.as_ref() {
103                if vnode_watermark.vnode_bitmap.is_set(vnode.to_index()) {
104                    return Some(vnode_watermark.watermark.clone());
105                }
106            }
107        }
108        None
109    }
110
111    pub fn latest_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
112        self.read_watermark(vnode, HummockEpoch::MAX)
113    }
114
115    pub fn rewrite_range_with_table_watermark(
116        &self,
117        epoch: HummockEpoch,
118        key_range: &mut TableKeyRange,
119    ) {
120        let vnode = vnode(key_range);
121        if let Some(watermark) = self.read_watermark(vnode, epoch) {
122            match self.watermark_direction {
123                WatermarkDirection::Ascending => {
124                    let overwrite_start_key = match &key_range.0 {
125                        Included(start_key) | Excluded(start_key) => {
126                            start_key.key_part() < watermark
127                        }
128                        Unbounded => true,
129                    };
130                    if overwrite_start_key {
131                        let watermark_key = TableKey(prefix_slice_with_vnode(vnode, &watermark));
132                        let fully_filtered = match &key_range.1 {
133                            Included(end_key) => end_key < &watermark_key,
134                            Excluded(end_key) => end_key <= &watermark_key,
135                            Unbounded => false,
136                        };
137                        if fully_filtered {
138                            key_range.1 = Excluded(watermark_key.clone());
139                        }
140                        key_range.0 = Included(watermark_key);
141                    }
142                }
143                WatermarkDirection::Descending => {
144                    let overwrite_end_key = match &key_range.1 {
145                        Included(end_key) | Excluded(end_key) => end_key.key_part() > watermark,
146                        Unbounded => true,
147                    };
148                    if overwrite_end_key {
149                        let watermark_key = TableKey(prefix_slice_with_vnode(vnode, &watermark));
150                        let fully_filtered = match &key_range.0 {
151                            Included(start_key) => start_key > &watermark_key,
152                            Excluded(start_key) => start_key >= &watermark_key,
153                            Unbounded => false,
154                        };
155                        if fully_filtered {
156                            *key_range = (Included(watermark_key.clone()), Excluded(watermark_key));
157                        } else {
158                            key_range.1 = Included(watermark_key);
159                        }
160                    }
161                }
162            }
163        }
164    }
165
166    pub fn filter_regress_watermarks(&self, watermarks: &mut Vec<VnodeWatermark>) {
167        let mut ret = Vec::with_capacity(watermarks.len());
168        for watermark in watermarks.drain(..) {
169            let vnode_count = watermark.vnode_count();
170
171            let mut regress_vnodes = None;
172            for vnode in watermark.vnode_bitmap.iter_vnodes() {
173                if let Some(prev_watermark) = self.latest_watermark(vnode) {
174                    let is_regress = match self.direction() {
175                        WatermarkDirection::Ascending => prev_watermark > watermark.watermark,
176                        WatermarkDirection::Descending => prev_watermark < watermark.watermark,
177                    };
178                    if is_regress {
179                        warn!(
180                            "table watermark regress: {:?} {} {:?} {:?}",
181                            self.direction(),
182                            vnode,
183                            watermark.watermark,
184                            prev_watermark
185                        );
186                        regress_vnodes
187                            .get_or_insert_with(|| BitmapBuilder::zeroed(vnode_count))
188                            .set(vnode.to_index(), true);
189                    }
190                }
191            }
192            if let Some(regress_vnodes) = regress_vnodes {
193                let mut bitmap_builder = None;
194                for vnode in watermark.vnode_bitmap.iter_vnodes() {
195                    let vnode_index = vnode.to_index();
196                    if !regress_vnodes.is_set(vnode_index) {
197                        bitmap_builder
198                            .get_or_insert_with(|| BitmapBuilder::zeroed(vnode_count))
199                            .set(vnode_index, true);
200                    }
201                }
202                if let Some(bitmap_builder) = bitmap_builder {
203                    ret.push(VnodeWatermark::new(
204                        Arc::new(bitmap_builder.finish()),
205                        watermark.watermark,
206                    ));
207                }
208            } else {
209                // no vnode has regress watermark
210                ret.push(watermark);
211            }
212        }
213        *watermarks = ret;
214    }
215
216    pub fn direction(&self) -> WatermarkDirection {
217        self.watermark_direction
218    }
219
220    pub fn add_epoch_watermark(
221        &mut self,
222        epoch: HummockEpoch,
223        vnode_watermark_list: Arc<[VnodeWatermark]>,
224        direction: WatermarkDirection,
225    ) {
226        assert!(epoch > self.latest_epoch);
227        assert_eq!(self.watermark_direction, direction);
228        self.latest_epoch = epoch;
229        #[cfg(debug_assertions)]
230        if !vnode_watermark_list.is_empty() {
231            let vnode_count = vnode_watermark_list[0].vnode_count();
232            let mut vnode_is_set = BitmapBuilder::zeroed(vnode_count);
233            for vnode_watermark in vnode_watermark_list.as_ref() {
234                for vnode in vnode_watermark.vnode_bitmap.iter_ones() {
235                    assert!(!vnode_is_set.is_set(vnode));
236                    vnode_is_set.set(vnode, true);
237                    let vnode = VirtualNode::from_index(vnode);
238                    if let Some(prev_watermark) = self.latest_watermark(vnode) {
239                        match self.watermark_direction {
240                            WatermarkDirection::Ascending => {
241                                assert!(vnode_watermark.watermark >= prev_watermark);
242                            }
243                            WatermarkDirection::Descending => {
244                                assert!(vnode_watermark.watermark <= prev_watermark);
245                            }
246                        };
247                    }
248                }
249            }
250        }
251        self.staging_watermarks
252            .push_back((epoch, vnode_watermark_list));
253    }
254
255    pub fn apply_committed_watermarks(
256        &mut self,
257        committed_watermark: Arc<TableWatermarks>,
258        committed_epoch: HummockEpoch,
259    ) {
260        assert_eq!(
261            committed_watermark.watermark_type,
262            WatermarkSerdeType::PkPrefix
263        );
264        assert_eq!(self.watermark_direction, committed_watermark.direction);
265        if let Some(prev_committed_epoch) = self.committed_epoch {
266            assert!(prev_committed_epoch <= committed_epoch);
267            if prev_committed_epoch == committed_epoch {
268                return;
269            }
270        }
271        if self.latest_epoch < committed_epoch {
272            debug!(
273                latest_epoch = self.latest_epoch,
274                committed_epoch, "committed_epoch exceed table watermark latest_epoch"
275            );
276            self.latest_epoch = committed_epoch;
277        }
278        self.committed_epoch = Some(committed_epoch);
279        self.committed_watermarks = Some(committed_watermark);
280        // keep only watermark higher than committed epoch
281        while let Some((old_epoch, _)) = self.staging_watermarks.front()
282            && *old_epoch <= committed_epoch
283        {
284            let _ = self.staging_watermarks.pop_front();
285        }
286    }
287}
288
289#[derive(Debug, Clone, Copy, Eq, PartialEq)]
290pub enum WatermarkDirection {
291    Ascending,
292    Descending,
293}
294
295impl Display for WatermarkDirection {
296    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
297        match self {
298            WatermarkDirection::Ascending => write!(f, "Ascending"),
299            WatermarkDirection::Descending => write!(f, "Descending"),
300        }
301    }
302}
303
304impl WatermarkDirection {
305    pub fn key_filter_by_watermark(
306        &self,
307        key: impl AsRef<[u8]>,
308        watermark: impl AsRef<[u8]>,
309    ) -> bool {
310        let key = key.as_ref();
311        let watermark = watermark.as_ref();
312        match self {
313            WatermarkDirection::Ascending => key < watermark,
314            WatermarkDirection::Descending => key > watermark,
315        }
316    }
317
318    pub fn datum_filter_by_watermark(
319        &self,
320        watermark_col_in_pk: impl ToDatumRef,
321        watermark: impl ToDatumRef,
322        order_type: OrderType,
323    ) -> bool {
324        let watermark_col_in_pk = watermark_col_in_pk.to_datum_ref();
325        let watermark = watermark.to_datum_ref();
326        match self {
327            WatermarkDirection::Ascending => {
328                // watermark_col_in_pk < watermark
329                cmp_datum(watermark_col_in_pk, watermark, order_type).is_lt()
330            }
331            WatermarkDirection::Descending => {
332                //  watermark_col_in_pk > watermark
333                cmp_datum(watermark_col_in_pk, watermark, order_type).is_gt()
334            }
335        }
336    }
337
338    pub fn is_ascending(&self) -> bool {
339        match self {
340            WatermarkDirection::Ascending => true,
341            WatermarkDirection::Descending => false,
342        }
343    }
344}
345
346#[derive(Clone, Debug, PartialEq, EstimateSize)]
347pub struct VnodeWatermark {
348    vnode_bitmap: Arc<Bitmap>,
349    watermark: Bytes,
350}
351
352impl VnodeWatermark {
353    pub fn new(vnode_bitmap: Arc<Bitmap>, watermark: Bytes) -> Self {
354        Self {
355            vnode_bitmap,
356            watermark,
357        }
358    }
359
360    pub fn vnode_bitmap(&self) -> &Bitmap {
361        &self.vnode_bitmap
362    }
363
364    /// Vnode count derived from the bitmap.
365    pub fn vnode_count(&self) -> usize {
366        self.vnode_bitmap.len()
367    }
368
369    pub fn watermark(&self) -> &Bytes {
370        &self.watermark
371    }
372
373    pub fn to_protobuf(&self) -> PbVnodeWatermark {
374        self.into()
375    }
376}
377
378impl From<PbVnodeWatermark> for VnodeWatermark {
379    fn from(pb: PbVnodeWatermark) -> Self {
380        Self {
381            vnode_bitmap: Arc::new(Bitmap::from(pb.vnode_bitmap.as_ref().unwrap())),
382            watermark: Bytes::from(pb.watermark),
383        }
384    }
385}
386
387impl From<&PbVnodeWatermark> for VnodeWatermark {
388    fn from(pb: &PbVnodeWatermark) -> Self {
389        Self {
390            vnode_bitmap: Arc::new(Bitmap::from(pb.vnode_bitmap.as_ref().unwrap())),
391            watermark: Bytes::from(pb.watermark.clone()),
392        }
393    }
394}
395
396impl From<VnodeWatermark> for PbVnodeWatermark {
397    fn from(watermark: VnodeWatermark) -> Self {
398        Self {
399            watermark: watermark.watermark.into(),
400            vnode_bitmap: Some(watermark.vnode_bitmap.to_protobuf()),
401        }
402    }
403}
404
405impl From<&VnodeWatermark> for PbVnodeWatermark {
406    fn from(watermark: &VnodeWatermark) -> Self {
407        Self {
408            watermark: watermark.watermark.to_vec(),
409            vnode_bitmap: Some(watermark.vnode_bitmap.to_protobuf()),
410        }
411    }
412}
413
414#[derive(Clone, Debug, PartialEq)]
415pub struct TableWatermarks {
416    // later epoch at the back
417    pub watermarks: Vec<(HummockEpoch, Arc<[VnodeWatermark]>)>,
418    pub direction: WatermarkDirection,
419    pub watermark_type: WatermarkSerdeType,
420}
421
422impl TableWatermarks {
423    pub fn single_epoch(
424        epoch: HummockEpoch,
425        watermarks: Vec<VnodeWatermark>,
426        direction: WatermarkDirection,
427        watermark_type: WatermarkSerdeType,
428    ) -> Self {
429        let mut this = Self {
430            direction,
431            watermarks: Vec::new(),
432            watermark_type,
433        };
434        this.add_new_epoch_watermarks(epoch, watermarks.into(), direction, watermark_type);
435        this
436    }
437
438    pub fn add_new_epoch_watermarks(
439        &mut self,
440        epoch: HummockEpoch,
441        watermarks: Arc<[VnodeWatermark]>,
442        direction: WatermarkDirection,
443        watermark_type: WatermarkSerdeType,
444    ) {
445        assert_eq!(self.direction, direction);
446        assert_eq!(self.watermark_type, watermark_type);
447
448        if let Some((prev_epoch, _)) = self.watermarks.last() {
449            assert!(*prev_epoch < epoch);
450        }
451        if !watermarks.is_empty() {
452            let vnode_count = watermarks[0].vnode_count();
453            for watermark in &*watermarks {
454                assert_eq!(watermark.vnode_count(), vnode_count);
455            }
456            if let Some(existing_vnode_count) = self.vnode_count() {
457                assert_eq!(existing_vnode_count, vnode_count);
458            }
459        }
460        self.watermarks.push((epoch, watermarks));
461    }
462
463    /// Vnode count derived from existing watermarks. Returns `None` if there is no watermark.
464    fn vnode_count(&self) -> Option<usize> {
465        self.watermarks
466            .iter()
467            .flat_map(|(_, watermarks)| watermarks.as_ref())
468            .next()
469            .map(|w| w.vnode_count())
470    }
471
472    pub fn from_protobuf(pb: &PbTableWatermarks) -> Self {
473        Self {
474            watermarks: pb
475                .epoch_watermarks
476                .iter()
477                .map(|epoch_watermark| {
478                    let epoch = epoch_watermark.epoch;
479                    let watermarks = epoch_watermark
480                        .watermarks
481                        .iter()
482                        .map(VnodeWatermark::from)
483                        .collect_vec();
484                    (epoch, Arc::from(watermarks))
485                })
486                .collect(),
487            direction: if pb.is_ascending {
488                WatermarkDirection::Ascending
489            } else {
490                WatermarkDirection::Descending
491            },
492            watermark_type: if pb.is_non_pk_prefix {
493                WatermarkSerdeType::NonPkPrefix
494            } else {
495                WatermarkSerdeType::PkPrefix
496            },
497        }
498    }
499}
500
501pub fn merge_multiple_new_table_watermarks(
502    table_watermarks_list: impl IntoIterator<Item = HashMap<TableId, TableWatermarks>>,
503) -> HashMap<TableId, TableWatermarks> {
504    #[allow(clippy::type_complexity)]
505    let mut ret: HashMap<
506        TableId,
507        (
508            WatermarkDirection,
509            BTreeMap<u64, Vec<VnodeWatermark>>,
510            WatermarkSerdeType,
511        ),
512    > = HashMap::new();
513    for table_watermarks in table_watermarks_list {
514        for (table_id, new_table_watermarks) in table_watermarks {
515            let epoch_watermarks = match ret.entry(table_id) {
516                Entry::Occupied(entry) => {
517                    let (direction, epoch_watermarks, watermark_type) = entry.into_mut();
518                    assert_eq!(&new_table_watermarks.direction, direction);
519                    assert_eq!(&new_table_watermarks.watermark_type, watermark_type);
520                    epoch_watermarks
521                }
522                Entry::Vacant(entry) => {
523                    let (_, epoch_watermarks, _) = entry.insert((
524                        new_table_watermarks.direction,
525                        BTreeMap::new(),
526                        new_table_watermarks.watermark_type,
527                    ));
528                    epoch_watermarks
529                }
530            };
531            for (new_epoch, new_epoch_watermarks) in new_table_watermarks.watermarks {
532                epoch_watermarks
533                    .entry(new_epoch)
534                    .or_insert_with(Vec::new)
535                    .extend(new_epoch_watermarks.iter().cloned());
536            }
537        }
538    }
539    ret.into_iter()
540        .map(
541            |(table_id, (direction, epoch_watermarks, watermark_type))| {
542                (
543                    table_id,
544                    TableWatermarks {
545                        direction,
546                        // ordered from earlier epoch to later epoch
547                        watermarks: epoch_watermarks
548                            .into_iter()
549                            .map(|(epoch, watermarks)| (epoch, Arc::from(watermarks)))
550                            .collect(),
551                        watermark_type,
552                    },
553                )
554            },
555        )
556        .collect()
557}
558
559impl TableWatermarks {
560    pub fn apply_new_table_watermarks(&mut self, newly_added_watermarks: &TableWatermarks) {
561        assert_eq!(self.direction, newly_added_watermarks.direction);
562        assert!(self.watermarks.iter().map(|(epoch, _)| epoch).is_sorted());
563        assert!(
564            newly_added_watermarks
565                .watermarks
566                .iter()
567                .map(|(epoch, _)| epoch)
568                .is_sorted()
569        );
570        // ensure that the newly added watermarks have a later epoch than the previous latest epoch.
571        if let Some((prev_last_epoch, _)) = self.watermarks.last()
572            && let Some((new_first_epoch, _)) = newly_added_watermarks.watermarks.first()
573        {
574            assert!(prev_last_epoch < new_first_epoch);
575        }
576        self.watermarks.extend(
577            newly_added_watermarks
578                .watermarks
579                .iter()
580                .map(|(epoch, new_watermarks)| (*epoch, new_watermarks.clone())),
581        );
582    }
583
584    pub fn clear_stale_epoch_watermark(&mut self, safe_epoch: u64) {
585        match self.watermarks.first() {
586            None => {
587                // return on empty watermark
588                return;
589            }
590            Some((earliest_epoch, _)) => {
591                if *earliest_epoch >= safe_epoch {
592                    // No stale epoch watermark needs to be cleared.
593                    return;
594                }
595            }
596        }
597        debug!("clear stale table watermark below epoch {}", safe_epoch);
598        let mut result_epoch_watermark = Vec::with_capacity(self.watermarks.len());
599        let mut set_vnode: HashSet<VirtualNode> = HashSet::new();
600        let mut vnode_count: Option<usize> = None; // lazy initialized on first occurrence of vnode watermark
601        while let Some((epoch, _)) = self.watermarks.last() {
602            if *epoch >= safe_epoch {
603                let (epoch, watermarks) = self.watermarks.pop().expect("have check Some");
604                for watermark in watermarks.as_ref() {
605                    vnode_count.get_or_insert_with(|| watermark.vnode_count());
606                    for vnode in watermark.vnode_bitmap.iter_vnodes() {
607                        set_vnode.insert(vnode);
608                    }
609                }
610                result_epoch_watermark.push((epoch, watermarks));
611            } else {
612                break;
613            }
614        }
615        while vnode_count != Some(set_vnode.len())
616            && let Some((_, watermarks)) = self.watermarks.pop()
617        {
618            let mut new_vnode_watermarks = Vec::new();
619            for vnode_watermark in watermarks.as_ref() {
620                let mut new_set_vnode = Vec::new();
621                vnode_count.get_or_insert_with(|| vnode_watermark.vnode_count());
622                for vnode in vnode_watermark.vnode_bitmap.iter_vnodes() {
623                    if set_vnode.insert(vnode) {
624                        new_set_vnode.push(vnode);
625                    }
626                }
627                if !new_set_vnode.is_empty() {
628                    let mut builder = BitmapBuilder::zeroed(vnode_watermark.vnode_count());
629                    for vnode in new_set_vnode {
630                        builder.set(vnode.to_index(), true);
631                    }
632                    let bitmap = Arc::new(builder.finish());
633                    new_vnode_watermarks.push(VnodeWatermark {
634                        vnode_bitmap: bitmap,
635                        watermark: vnode_watermark.watermark.clone(),
636                    })
637                }
638            }
639            if !new_vnode_watermarks.is_empty() {
640                if let Some((last_epoch, last_watermarks)) = result_epoch_watermark.last_mut()
641                    && *last_epoch == safe_epoch
642                {
643                    *last_watermarks = Arc::from(
644                        last_watermarks
645                            .iter()
646                            .cloned()
647                            .chain(new_vnode_watermarks.into_iter())
648                            .collect_vec(),
649                    );
650                } else {
651                    result_epoch_watermark.push((safe_epoch, Arc::from(new_vnode_watermarks)));
652                }
653            }
654        }
655        // epoch watermark are added from later epoch to earlier epoch.
656        // reverse to ensure that earlier epochs are at the front
657        result_epoch_watermark.reverse();
658        assert!(
659            result_epoch_watermark
660                .is_sorted_by(|(first_epoch, _), (second_epoch, _)| { first_epoch < second_epoch })
661        );
662        *self = TableWatermarks {
663            watermarks: result_epoch_watermark,
664            direction: self.direction,
665            watermark_type: self.watermark_type,
666        }
667    }
668}
669
670impl TableWatermarks {
671    pub fn estimated_encode_len(&self) -> usize {
672        self.watermarks.len() * size_of::<HummockEpoch>()
673            + self
674                .watermarks
675                .iter()
676                .map(|(_, watermarks)| {
677                    watermarks
678                        .iter()
679                        .map(|watermark| watermark.estimated_size())
680                        .sum::<usize>()
681                })
682                .sum::<usize>()
683            + size_of::<bool>() // for direction
684    }
685
686    pub fn to_protobuf(&self) -> PbTableWatermarks {
687        self.into()
688    }
689}
690
691impl From<&PbTableWatermarks> for TableWatermarks {
692    fn from(pb: &PbTableWatermarks) -> Self {
693        Self {
694            watermarks: pb
695                .epoch_watermarks
696                .iter()
697                .map(|epoch_watermark| {
698                    let epoch = epoch_watermark.epoch;
699                    let watermarks = epoch_watermark
700                        .watermarks
701                        .iter()
702                        .map(VnodeWatermark::from)
703                        .collect();
704                    (epoch, watermarks)
705                })
706                .collect(),
707            direction: if pb.is_ascending {
708                WatermarkDirection::Ascending
709            } else {
710                WatermarkDirection::Descending
711            },
712            watermark_type: if pb.is_non_pk_prefix {
713                WatermarkSerdeType::NonPkPrefix
714            } else {
715                WatermarkSerdeType::PkPrefix
716            },
717        }
718    }
719}
720
721impl From<&TableWatermarks> for PbTableWatermarks {
722    fn from(table_watermarks: &TableWatermarks) -> Self {
723        Self {
724            epoch_watermarks: table_watermarks
725                .watermarks
726                .iter()
727                .map(|(epoch, watermarks)| PbEpochNewWatermarks {
728                    watermarks: watermarks.iter().map(|wm| wm.into()).collect(),
729                    epoch: *epoch,
730                })
731                .collect(),
732            is_ascending: match table_watermarks.direction {
733                WatermarkDirection::Ascending => true,
734                WatermarkDirection::Descending => false,
735            },
736            is_non_pk_prefix: match table_watermarks.watermark_type {
737                WatermarkSerdeType::NonPkPrefix => true,
738                WatermarkSerdeType::PkPrefix => false,
739            },
740        }
741    }
742}
743
744impl From<PbTableWatermarks> for TableWatermarks {
745    fn from(pb: PbTableWatermarks) -> Self {
746        Self {
747            watermarks: pb
748                .epoch_watermarks
749                .into_iter()
750                .map(|epoch_watermark| {
751                    let epoch = epoch_watermark.epoch;
752                    let watermarks = epoch_watermark
753                        .watermarks
754                        .into_iter()
755                        .map(VnodeWatermark::from)
756                        .collect();
757                    (epoch, watermarks)
758                })
759                .collect(),
760            direction: if pb.is_ascending {
761                WatermarkDirection::Ascending
762            } else {
763                WatermarkDirection::Descending
764            },
765            watermark_type: if pb.is_non_pk_prefix {
766                WatermarkSerdeType::NonPkPrefix
767            } else {
768                WatermarkSerdeType::PkPrefix
769            },
770        }
771    }
772}
773
774impl From<TableWatermarks> for PbTableWatermarks {
775    fn from(table_watermarks: TableWatermarks) -> Self {
776        Self {
777            epoch_watermarks: table_watermarks
778                .watermarks
779                .into_iter()
780                .map(|(epoch, watermarks)| PbEpochNewWatermarks {
781                    watermarks: watermarks.iter().map(PbVnodeWatermark::from).collect(),
782                    epoch,
783                })
784                .collect(),
785            is_ascending: match table_watermarks.direction {
786                WatermarkDirection::Ascending => true,
787                WatermarkDirection::Descending => false,
788            },
789            is_non_pk_prefix: match table_watermarks.watermark_type {
790                WatermarkSerdeType::NonPkPrefix => true,
791                WatermarkSerdeType::PkPrefix => false,
792            },
793        }
794    }
795}
796
797#[derive(Debug, Clone, Copy, PartialEq, Eq)]
798pub enum WatermarkSerdeType {
799    PkPrefix,
800    NonPkPrefix,
801}
802
803#[cfg(test)]
804mod tests {
805    use std::collections::Bound::Included;
806    use std::collections::{Bound, HashMap};
807    use std::ops::Bound::Excluded;
808    use std::sync::Arc;
809    use std::vec;
810
811    use bytes::Bytes;
812    use itertools::Itertools;
813    use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
814    use risingwave_common::catalog::TableId;
815    use risingwave_common::hash::VirtualNode;
816    use risingwave_common::util::epoch::{EpochExt, test_epoch};
817    use risingwave_pb::hummock::{PbHummockVersion, StateTableInfo};
818
819    use crate::compaction_group::StaticCompactionGroupId;
820    use crate::key::{TableKeyRange, is_empty_key_range, prefixed_range_with_vnode};
821    use crate::table_watermark::{
822        PkPrefixTableWatermarksIndex, TableWatermarks, VnodeWatermark, WatermarkDirection,
823        WatermarkSerdeType, merge_multiple_new_table_watermarks,
824    };
825    use crate::version::HummockVersion;
826
827    fn build_bitmap(vnodes: impl IntoIterator<Item = usize>) -> Arc<Bitmap> {
828        let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
829        for vnode in vnodes {
830            builder.set(vnode, true);
831        }
832        Arc::new(builder.finish())
833    }
834
835    #[test]
836    fn test_apply_new_table_watermark() {
837        let epoch1 = test_epoch(1);
838        let direction = WatermarkDirection::Ascending;
839        let watermark1 = Bytes::from("watermark1");
840        let watermark2 = Bytes::from("watermark2");
841        let watermark3 = Bytes::from("watermark3");
842        let watermark4 = Bytes::from("watermark4");
843        let watermark_type = WatermarkSerdeType::PkPrefix;
844        let mut table_watermarks = TableWatermarks::single_epoch(
845            epoch1,
846            vec![VnodeWatermark::new(build_bitmap(vec![0, 1, 2]), watermark1)],
847            direction,
848            watermark_type,
849        );
850        let epoch2 = epoch1.next_epoch();
851        table_watermarks.add_new_epoch_watermarks(
852            epoch2,
853            vec![VnodeWatermark::new(
854                build_bitmap(vec![0, 1, 2, 3]),
855                watermark2,
856            )]
857            .into(),
858            direction,
859            watermark_type,
860        );
861
862        let mut table_watermark_checkpoint = table_watermarks.clone();
863
864        let epoch3 = epoch2.next_epoch();
865        let mut second_table_watermark = TableWatermarks::single_epoch(
866            epoch3,
867            vec![VnodeWatermark::new(
868                build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
869                watermark3.clone(),
870            )],
871            direction,
872            watermark_type,
873        );
874        table_watermarks.add_new_epoch_watermarks(
875            epoch3,
876            vec![VnodeWatermark::new(
877                build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
878                watermark3,
879            )]
880            .into(),
881            direction,
882            watermark_type,
883        );
884        let epoch4 = epoch3.next_epoch();
885        let epoch5 = epoch4.next_epoch();
886        table_watermarks.add_new_epoch_watermarks(
887            epoch5,
888            vec![VnodeWatermark::new(
889                build_bitmap(vec![0, 3, 4]),
890                watermark4.clone(),
891            )]
892            .into(),
893            direction,
894            watermark_type,
895        );
896        second_table_watermark.add_new_epoch_watermarks(
897            epoch5,
898            vec![VnodeWatermark::new(build_bitmap(vec![0, 3, 4]), watermark4)].into(),
899            direction,
900            watermark_type,
901        );
902
903        table_watermark_checkpoint.apply_new_table_watermarks(&second_table_watermark);
904        assert_eq!(table_watermarks, table_watermark_checkpoint);
905    }
906
907    #[test]
908    fn test_clear_stale_epoch_watmermark() {
909        let epoch1 = test_epoch(1);
910        let direction = WatermarkDirection::Ascending;
911        let watermark1 = Bytes::from("watermark1");
912        let watermark2 = Bytes::from("watermark2");
913        let watermark3 = Bytes::from("watermark3");
914        let watermark4 = Bytes::from("watermark4");
915        let watermark_type = WatermarkSerdeType::PkPrefix;
916        let mut table_watermarks = TableWatermarks::single_epoch(
917            epoch1,
918            vec![VnodeWatermark::new(build_bitmap(vec![0, 1, 2]), watermark1)],
919            direction,
920            watermark_type,
921        );
922        let epoch2 = epoch1.next_epoch();
923        table_watermarks.add_new_epoch_watermarks(
924            epoch2,
925            vec![VnodeWatermark::new(
926                build_bitmap(vec![0, 1, 2, 3]),
927                watermark2.clone(),
928            )]
929            .into(),
930            direction,
931            watermark_type,
932        );
933        let epoch3 = epoch2.next_epoch();
934        table_watermarks.add_new_epoch_watermarks(
935            epoch3,
936            vec![VnodeWatermark::new(
937                build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
938                watermark3.clone(),
939            )]
940            .into(),
941            direction,
942            watermark_type,
943        );
944        let epoch4 = epoch3.next_epoch();
945        let epoch5 = epoch4.next_epoch();
946        table_watermarks.add_new_epoch_watermarks(
947            epoch5,
948            vec![VnodeWatermark::new(
949                build_bitmap(vec![0, 3, 4]),
950                watermark4.clone(),
951            )]
952            .into(),
953            direction,
954            watermark_type,
955        );
956
957        let mut table_watermarks_checkpoint = table_watermarks.clone();
958        table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch1);
959        assert_eq!(table_watermarks_checkpoint, table_watermarks);
960
961        table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch2);
962        assert_eq!(
963            table_watermarks_checkpoint,
964            TableWatermarks {
965                watermarks: vec![
966                    (
967                        epoch2,
968                        vec![VnodeWatermark::new(
969                            build_bitmap(vec![0, 1, 2, 3]),
970                            watermark2,
971                        )]
972                        .into()
973                    ),
974                    (
975                        epoch3,
976                        vec![VnodeWatermark::new(
977                            build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
978                            watermark3.clone(),
979                        )]
980                        .into()
981                    ),
982                    (
983                        epoch5,
984                        vec![VnodeWatermark::new(
985                            build_bitmap(vec![0, 3, 4]),
986                            watermark4.clone(),
987                        )]
988                        .into()
989                    )
990                ],
991                direction,
992                watermark_type,
993            }
994        );
995
996        table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch3);
997        assert_eq!(
998            table_watermarks_checkpoint,
999            TableWatermarks {
1000                watermarks: vec![
1001                    (
1002                        epoch3,
1003                        vec![VnodeWatermark::new(
1004                            build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
1005                            watermark3.clone(),
1006                        )]
1007                        .into()
1008                    ),
1009                    (
1010                        epoch5,
1011                        vec![VnodeWatermark::new(
1012                            build_bitmap(vec![0, 3, 4]),
1013                            watermark4.clone(),
1014                        )]
1015                        .into()
1016                    )
1017                ],
1018                direction,
1019                watermark_type,
1020            }
1021        );
1022
1023        table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch4);
1024        assert_eq!(
1025            table_watermarks_checkpoint,
1026            TableWatermarks {
1027                watermarks: vec![
1028                    (
1029                        epoch4,
1030                        vec![VnodeWatermark::new(
1031                            build_bitmap((1..3).chain(5..VirtualNode::COUNT_FOR_TEST)),
1032                            watermark3.clone()
1033                        )]
1034                        .into()
1035                    ),
1036                    (
1037                        epoch5,
1038                        vec![VnodeWatermark::new(
1039                            build_bitmap(vec![0, 3, 4]),
1040                            watermark4.clone(),
1041                        )]
1042                        .into()
1043                    )
1044                ],
1045                direction,
1046                watermark_type,
1047            }
1048        );
1049
1050        table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch5);
1051        assert_eq!(
1052            table_watermarks_checkpoint,
1053            TableWatermarks {
1054                watermarks: vec![(
1055                    epoch5,
1056                    vec![
1057                        VnodeWatermark::new(build_bitmap(vec![0, 3, 4]), watermark4),
1058                        VnodeWatermark::new(
1059                            build_bitmap((1..3).chain(5..VirtualNode::COUNT_FOR_TEST)),
1060                            watermark3
1061                        )
1062                    ]
1063                    .into()
1064                )],
1065                direction,
1066                watermark_type,
1067            }
1068        );
1069    }
1070
1071    #[test]
1072    fn test_merge_multiple_new_table_watermarks() {
1073        fn epoch_new_watermark(epoch: u64, bitmaps: Vec<&Bitmap>) -> (u64, Arc<[VnodeWatermark]>) {
1074            (
1075                epoch,
1076                bitmaps
1077                    .into_iter()
1078                    .map(|bitmap| VnodeWatermark {
1079                        watermark: Bytes::from(vec![1, 2, epoch as _]),
1080                        vnode_bitmap: Arc::new(bitmap.clone()),
1081                    })
1082                    .collect_vec()
1083                    .into(),
1084            )
1085        }
1086        fn build_table_watermark(
1087            vnodes: impl IntoIterator<Item = usize>,
1088            epochs: impl IntoIterator<Item = u64>,
1089        ) -> TableWatermarks {
1090            let bitmap = build_bitmap(vnodes);
1091            TableWatermarks {
1092                watermarks: epochs
1093                    .into_iter()
1094                    .map(|epoch: u64| epoch_new_watermark(epoch, vec![&bitmap]))
1095                    .collect(),
1096                direction: WatermarkDirection::Ascending,
1097                watermark_type: WatermarkSerdeType::PkPrefix,
1098            }
1099        }
1100        let table1_watermark1 = build_table_watermark(0..3, vec![1, 2, 4]);
1101        let table1_watermark2 = build_table_watermark(4..6, vec![1, 2, 5]);
1102        let table2_watermark = build_table_watermark(0..4, 1..3);
1103        let table3_watermark = build_table_watermark(0..4, 3..5);
1104        let mut first = HashMap::new();
1105        first.insert(TableId::new(1), table1_watermark1);
1106        first.insert(TableId::new(2), table2_watermark.clone());
1107        let mut second = HashMap::new();
1108        second.insert(TableId::new(1), table1_watermark2);
1109        second.insert(TableId::new(3), table3_watermark.clone());
1110        let result = merge_multiple_new_table_watermarks(vec![first, second]);
1111        let mut expected = HashMap::new();
1112        expected.insert(
1113            TableId::new(1),
1114            TableWatermarks {
1115                watermarks: vec![
1116                    epoch_new_watermark(1, vec![&build_bitmap(0..3), &build_bitmap(4..6)]),
1117                    epoch_new_watermark(2, vec![&build_bitmap(0..3), &build_bitmap(4..6)]),
1118                    epoch_new_watermark(4, vec![&build_bitmap(0..3)]),
1119                    epoch_new_watermark(5, vec![&build_bitmap(4..6)]),
1120                ],
1121                direction: WatermarkDirection::Ascending,
1122                watermark_type: WatermarkSerdeType::PkPrefix,
1123            },
1124        );
1125        expected.insert(TableId::new(2), table2_watermark);
1126        expected.insert(TableId::new(3), table3_watermark);
1127        assert_eq!(result, expected);
1128    }
1129
1130    const COMMITTED_EPOCH: u64 = test_epoch(1);
1131    const EPOCH1: u64 = test_epoch(2);
1132    const EPOCH2: u64 = test_epoch(3);
1133    const TEST_SINGLE_VNODE: VirtualNode = VirtualNode::from_index(1);
1134
1135    fn build_watermark_range(
1136        direction: WatermarkDirection,
1137        (low, high): (Bound<Bytes>, Bound<Bytes>),
1138    ) -> TableKeyRange {
1139        let range = match direction {
1140            WatermarkDirection::Ascending => (low, high),
1141            WatermarkDirection::Descending => (high, low),
1142        };
1143        prefixed_range_with_vnode(range, TEST_SINGLE_VNODE)
1144    }
1145
1146    /// Build and return a watermark index with the following watermarks
1147    /// EPOCH1 bitmap(0, 1, 2, 3) watermark1
1148    /// EPOCH2 bitmap(1, 2, 3, 4) watermark2
1149    fn build_and_test_watermark_index(
1150        direction: WatermarkDirection,
1151        watermark1: Bytes,
1152        watermark2: Bytes,
1153        watermark3: Bytes,
1154    ) -> PkPrefixTableWatermarksIndex {
1155        let mut index = PkPrefixTableWatermarksIndex::new(
1156            direction,
1157            EPOCH1,
1158            vec![VnodeWatermark::new(build_bitmap(0..4), watermark1.clone())],
1159            Some(COMMITTED_EPOCH),
1160        );
1161        index.add_epoch_watermark(
1162            EPOCH2,
1163            vec![VnodeWatermark::new(build_bitmap(1..5), watermark2.clone())].into(),
1164            direction,
1165        );
1166
1167        assert_eq!(
1168            index.read_watermark(VirtualNode::from_index(0), EPOCH1),
1169            Some(watermark1.clone())
1170        );
1171        assert_eq!(
1172            index.read_watermark(VirtualNode::from_index(1), EPOCH1),
1173            Some(watermark1.clone())
1174        );
1175        assert_eq!(
1176            index.read_watermark(VirtualNode::from_index(4), EPOCH1),
1177            None
1178        );
1179        assert_eq!(
1180            index.read_watermark(VirtualNode::from_index(0), EPOCH2),
1181            Some(watermark1.clone())
1182        );
1183        assert_eq!(
1184            index.read_watermark(VirtualNode::from_index(1), EPOCH2),
1185            Some(watermark2.clone())
1186        );
1187        assert_eq!(
1188            index.read_watermark(VirtualNode::from_index(4), EPOCH2),
1189            Some(watermark2.clone())
1190        );
1191        assert_eq!(
1192            index.latest_watermark(VirtualNode::from_index(0)),
1193            Some(watermark1.clone())
1194        );
1195        assert_eq!(
1196            index.latest_watermark(VirtualNode::from_index(1)),
1197            Some(watermark2.clone())
1198        );
1199        assert_eq!(
1200            index.latest_watermark(VirtualNode::from_index(4)),
1201            Some(watermark2.clone())
1202        );
1203
1204        // watermark is watermark2
1205        let check_watermark_range =
1206            |query_range: (Bound<Bytes>, Bound<Bytes>),
1207             output_range: Option<(Bound<Bytes>, Bound<Bytes>)>| {
1208                let mut range = build_watermark_range(direction, query_range);
1209                index.rewrite_range_with_table_watermark(EPOCH2, &mut range);
1210                if let Some(output_range) = output_range {
1211                    assert_eq!(range, build_watermark_range(direction, output_range));
1212                } else {
1213                    assert!(is_empty_key_range(&range));
1214                }
1215            };
1216
1217        // test read from single vnode and truncate begin key range
1218        check_watermark_range(
1219            (Included(watermark1.clone()), Excluded(watermark3.clone())),
1220            Some((Included(watermark2.clone()), Excluded(watermark3.clone()))),
1221        );
1222
1223        // test read from single vnode and begin key right at watermark
1224        check_watermark_range(
1225            (Included(watermark2.clone()), Excluded(watermark3.clone())),
1226            Some((Included(watermark2.clone()), Excluded(watermark3.clone()))),
1227        );
1228        check_watermark_range(
1229            (Excluded(watermark2.clone()), Excluded(watermark3.clone())),
1230            Some((Excluded(watermark2.clone()), Excluded(watermark3))),
1231        );
1232
1233        // test read from single vnode and end key right at watermark
1234        check_watermark_range(
1235            (Excluded(watermark1.clone()), Excluded(watermark2.clone())),
1236            None,
1237        );
1238        check_watermark_range(
1239            (Excluded(watermark1), Included(watermark2.clone())),
1240            Some((Included(watermark2.clone()), Included(watermark2))),
1241        );
1242
1243        index
1244    }
1245
1246    #[test]
1247    fn test_watermark_index_ascending() {
1248        let watermark1 = Bytes::from_static(b"watermark1");
1249        let watermark2 = Bytes::from_static(b"watermark2");
1250        let watermark3 = Bytes::from_static(b"watermark3");
1251        build_and_test_watermark_index(
1252            WatermarkDirection::Ascending,
1253            watermark1,
1254            watermark2,
1255            watermark3,
1256        );
1257    }
1258
1259    #[test]
1260    fn test_watermark_index_descending() {
1261        let watermark1 = Bytes::from_static(b"watermark254");
1262        let watermark2 = Bytes::from_static(b"watermark253");
1263        let watermark3 = Bytes::from_static(b"watermark252");
1264        build_and_test_watermark_index(
1265            WatermarkDirection::Descending,
1266            watermark1,
1267            watermark2,
1268            watermark3,
1269        );
1270    }
1271
1272    #[test]
1273    fn test_apply_committed_index() {
1274        let watermark1 = Bytes::from_static(b"watermark1");
1275        let watermark2 = Bytes::from_static(b"watermark2");
1276        let watermark3 = Bytes::from_static(b"watermark3");
1277        let mut index = build_and_test_watermark_index(
1278            WatermarkDirection::Ascending,
1279            watermark1.clone(),
1280            watermark2.clone(),
1281            watermark3,
1282        );
1283
1284        let test_table_id = TableId::from(233);
1285
1286        let mut version = HummockVersion::from_rpc_protobuf(&PbHummockVersion {
1287            state_table_info: HashMap::from_iter([(
1288                test_table_id.table_id,
1289                StateTableInfo {
1290                    committed_epoch: EPOCH1,
1291                    compaction_group_id: StaticCompactionGroupId::StateDefault as _,
1292                },
1293            )]),
1294            ..Default::default()
1295        });
1296        version.table_watermarks.insert(
1297            test_table_id,
1298            TableWatermarks {
1299                watermarks: vec![(
1300                    EPOCH1,
1301                    vec![VnodeWatermark {
1302                        watermark: watermark1.clone(),
1303                        vnode_bitmap: build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
1304                    }]
1305                    .into(),
1306                )],
1307                direction: WatermarkDirection::Ascending,
1308                watermark_type: WatermarkSerdeType::PkPrefix,
1309            }
1310            .into(),
1311        );
1312        index.apply_committed_watermarks(
1313            version
1314                .table_watermarks
1315                .get(&test_table_id)
1316                .unwrap()
1317                .clone(),
1318            EPOCH1,
1319        );
1320        assert_eq!(EPOCH1, index.committed_epoch.unwrap());
1321        assert_eq!(EPOCH2, index.latest_epoch);
1322        for vnode in 0..VirtualNode::COUNT_FOR_TEST {
1323            let vnode = VirtualNode::from_index(vnode);
1324            if (1..5).contains(&vnode.to_index()) {
1325                assert_eq!(watermark1, index.read_watermark(vnode, EPOCH1).unwrap());
1326                assert_eq!(watermark2, index.read_watermark(vnode, EPOCH2).unwrap());
1327            } else {
1328                assert_eq!(watermark1, index.read_watermark(vnode, EPOCH1).unwrap());
1329            }
1330        }
1331    }
1332
1333    #[test]
1334    fn test_filter_regress_watermark() {
1335        let watermark1 = Bytes::from_static(b"watermark1");
1336        let watermark2 = Bytes::from_static(b"watermark2");
1337        let watermark3 = Bytes::from_static(b"watermark3");
1338        let index = build_and_test_watermark_index(
1339            WatermarkDirection::Ascending,
1340            watermark1.clone(),
1341            watermark2,
1342            watermark3.clone(),
1343        );
1344
1345        let mut new_watermarks = vec![
1346            // Partial regress
1347            VnodeWatermark {
1348                vnode_bitmap: build_bitmap(0..2),
1349                watermark: watermark1.clone(),
1350            },
1351            // All not regress
1352            VnodeWatermark {
1353                vnode_bitmap: build_bitmap(2..4),
1354                watermark: watermark3.clone(),
1355            },
1356            // All regress
1357            VnodeWatermark {
1358                vnode_bitmap: build_bitmap(4..5),
1359                watermark: watermark1.clone(),
1360            },
1361            // All newly set vnode
1362            VnodeWatermark {
1363                vnode_bitmap: build_bitmap(5..6),
1364                watermark: watermark3.clone(),
1365            },
1366        ];
1367
1368        index.filter_regress_watermarks(&mut new_watermarks);
1369
1370        assert_eq!(
1371            new_watermarks,
1372            vec![
1373                VnodeWatermark {
1374                    vnode_bitmap: build_bitmap(0..1),
1375                    watermark: watermark1,
1376                },
1377                VnodeWatermark {
1378                    vnode_bitmap: build_bitmap(2..4),
1379                    watermark: watermark3.clone(),
1380                },
1381                VnodeWatermark {
1382                    vnode_bitmap: build_bitmap(5..6),
1383                    watermark: watermark3,
1384                },
1385            ]
1386        );
1387    }
1388}