1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
17use std::fmt::Display;
18use std::mem::size_of;
19use std::ops::Bound::{Excluded, Included, Unbounded};
20use std::sync::Arc;
21
22use bytes::Bytes;
23use itertools::Itertools;
24use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
25use risingwave_common::catalog::TableId;
26use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
27use risingwave_common::types::ToDatumRef;
28use risingwave_common::util::sort_util::{OrderType, cmp_datum};
29use risingwave_common_estimate_size::EstimateSize;
30use risingwave_pb::hummock::table_watermarks::PbEpochNewWatermarks;
31use risingwave_pb::hummock::{
32 PbVnodeWatermark, PbWatermarkSerdeType, TableWatermarks as PbTableWatermarks,
33};
34use tracing::{debug, warn};
35
36use crate::HummockEpoch;
37use crate::key::{TableKey, TableKeyRange, prefix_slice_with_vnode, vnode};
38
39#[derive(Clone)]
40pub struct ReadTableWatermark {
41 pub direction: WatermarkDirection,
42 pub vnode_watermarks: BTreeMap<VirtualNode, Bytes>,
43}
44
45#[derive(Clone)]
46pub struct PkPrefixTableWatermarksIndex {
47 pub watermark_direction: WatermarkDirection,
48 pub staging_watermarks: VecDeque<(HummockEpoch, Arc<[VnodeWatermark]>)>,
50 pub committed_watermarks: Option<Arc<TableWatermarks>>,
51 latest_epoch: HummockEpoch,
52 committed_epoch: Option<HummockEpoch>,
53}
54
55impl PkPrefixTableWatermarksIndex {
56 pub fn new(
57 watermark_direction: WatermarkDirection,
58 first_epoch: HummockEpoch,
59 first_vnode_watermark: Vec<VnodeWatermark>,
60 committed_epoch: Option<HummockEpoch>,
61 ) -> Self {
62 if let Some(committed_epoch) = committed_epoch {
63 assert!(first_epoch > committed_epoch);
64 }
65 Self {
66 watermark_direction,
67 staging_watermarks: VecDeque::from_iter([(
68 first_epoch,
69 Arc::from(first_vnode_watermark),
70 )]),
71 committed_watermarks: None,
72 latest_epoch: first_epoch,
73 committed_epoch,
74 }
75 }
76
77 pub fn new_committed(
78 committed_watermarks: Arc<TableWatermarks>,
79 committed_epoch: HummockEpoch,
80 ) -> Self {
81 assert_eq!(
82 committed_watermarks.watermark_type,
83 WatermarkSerdeType::PkPrefix
84 );
85 Self {
86 watermark_direction: committed_watermarks.direction,
87 staging_watermarks: VecDeque::new(),
88 committed_epoch: Some(committed_epoch),
89 latest_epoch: committed_epoch,
90 committed_watermarks: Some(committed_watermarks),
91 }
92 }
93
94 pub fn read_watermark(&self, vnode: VirtualNode, epoch: HummockEpoch) -> Option<Bytes> {
95 for (watermark_epoch, vnode_watermark_list) in self.staging_watermarks.iter().rev().chain(
97 self.committed_watermarks
98 .iter()
99 .flat_map(|watermarks| watermarks.watermarks.iter().rev()),
100 ) {
101 if *watermark_epoch > epoch {
102 continue;
103 }
104 for vnode_watermark in vnode_watermark_list.as_ref() {
105 if vnode_watermark.vnode_bitmap.is_set(vnode.to_index()) {
106 return Some(vnode_watermark.watermark.clone());
107 }
108 }
109 }
110 None
111 }
112
113 pub fn latest_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
114 self.read_watermark(vnode, HummockEpoch::MAX)
115 }
116
117 pub fn rewrite_range_with_table_watermark(
118 &self,
119 epoch: HummockEpoch,
120 key_range: &mut TableKeyRange,
121 ) {
122 let vnode = vnode(key_range);
123 if let Some(watermark) = self.read_watermark(vnode, epoch) {
124 match self.watermark_direction {
125 WatermarkDirection::Ascending => {
126 let overwrite_start_key = match &key_range.0 {
127 Included(start_key) | Excluded(start_key) => {
128 start_key.key_part() < watermark
129 }
130 Unbounded => true,
131 };
132 if overwrite_start_key {
133 let watermark_key = TableKey(prefix_slice_with_vnode(vnode, &watermark));
134 let fully_filtered = match &key_range.1 {
135 Included(end_key) => end_key < &watermark_key,
136 Excluded(end_key) => end_key <= &watermark_key,
137 Unbounded => false,
138 };
139 if fully_filtered {
140 key_range.1 = Excluded(watermark_key.clone());
141 }
142 key_range.0 = Included(watermark_key);
143 }
144 }
145 WatermarkDirection::Descending => {
146 let overwrite_end_key = match &key_range.1 {
147 Included(end_key) | Excluded(end_key) => end_key.key_part() > watermark,
148 Unbounded => true,
149 };
150 if overwrite_end_key {
151 let watermark_key = TableKey(prefix_slice_with_vnode(vnode, &watermark));
152 let fully_filtered = match &key_range.0 {
153 Included(start_key) => start_key > &watermark_key,
154 Excluded(start_key) => start_key >= &watermark_key,
155 Unbounded => false,
156 };
157 if fully_filtered {
158 *key_range = (Included(watermark_key.clone()), Excluded(watermark_key));
159 } else {
160 key_range.1 = Included(watermark_key);
161 }
162 }
163 }
164 }
165 }
166 }
167
168 pub fn filter_regress_watermarks(&self, watermarks: &mut Vec<VnodeWatermark>) {
169 let mut ret = Vec::with_capacity(watermarks.len());
170 for watermark in watermarks.drain(..) {
171 let vnode_count = watermark.vnode_count();
172
173 let mut regress_vnodes = None;
174 for vnode in watermark.vnode_bitmap.iter_vnodes() {
175 if let Some(prev_watermark) = self.latest_watermark(vnode) {
176 let is_regress = match self.direction() {
177 WatermarkDirection::Ascending => prev_watermark > watermark.watermark,
178 WatermarkDirection::Descending => prev_watermark < watermark.watermark,
179 };
180 if is_regress {
181 warn!(
182 "table watermark regress: {:?} {} {:?} {:?}",
183 self.direction(),
184 vnode,
185 watermark.watermark,
186 prev_watermark
187 );
188 regress_vnodes
189 .get_or_insert_with(|| BitmapBuilder::zeroed(vnode_count))
190 .set(vnode.to_index(), true);
191 }
192 }
193 }
194 if let Some(regress_vnodes) = regress_vnodes {
195 let mut bitmap_builder = None;
196 for vnode in watermark.vnode_bitmap.iter_vnodes() {
197 let vnode_index = vnode.to_index();
198 if !regress_vnodes.is_set(vnode_index) {
199 bitmap_builder
200 .get_or_insert_with(|| BitmapBuilder::zeroed(vnode_count))
201 .set(vnode_index, true);
202 }
203 }
204 if let Some(bitmap_builder) = bitmap_builder {
205 ret.push(VnodeWatermark::new(
206 Arc::new(bitmap_builder.finish()),
207 watermark.watermark,
208 ));
209 }
210 } else {
211 ret.push(watermark);
213 }
214 }
215 *watermarks = ret;
216 }
217
218 pub fn direction(&self) -> WatermarkDirection {
219 self.watermark_direction
220 }
221
222 pub fn add_epoch_watermark(
223 &mut self,
224 epoch: HummockEpoch,
225 vnode_watermark_list: Arc<[VnodeWatermark]>,
226 direction: WatermarkDirection,
227 ) {
228 assert!(epoch > self.latest_epoch);
229 assert_eq!(self.watermark_direction, direction);
230 self.latest_epoch = epoch;
231 #[cfg(debug_assertions)]
232 if !vnode_watermark_list.is_empty() {
233 let vnode_count = vnode_watermark_list[0].vnode_count();
234 let mut vnode_is_set = BitmapBuilder::zeroed(vnode_count);
235 for vnode_watermark in vnode_watermark_list.as_ref() {
236 for vnode in vnode_watermark.vnode_bitmap.iter_ones() {
237 assert!(!vnode_is_set.is_set(vnode));
238 vnode_is_set.set(vnode, true);
239 let vnode = VirtualNode::from_index(vnode);
240 if let Some(prev_watermark) = self.latest_watermark(vnode) {
241 match self.watermark_direction {
242 WatermarkDirection::Ascending => {
243 assert!(vnode_watermark.watermark >= prev_watermark);
244 }
245 WatermarkDirection::Descending => {
246 assert!(vnode_watermark.watermark <= prev_watermark);
247 }
248 };
249 }
250 }
251 }
252 }
253 self.staging_watermarks
254 .push_back((epoch, vnode_watermark_list));
255 }
256
257 pub fn apply_committed_watermarks(
258 &mut self,
259 committed_watermark: Arc<TableWatermarks>,
260 committed_epoch: HummockEpoch,
261 ) {
262 assert_eq!(
263 committed_watermark.watermark_type,
264 WatermarkSerdeType::PkPrefix
265 );
266 assert_eq!(self.watermark_direction, committed_watermark.direction);
267 if let Some(prev_committed_epoch) = self.committed_epoch {
268 assert!(prev_committed_epoch <= committed_epoch);
269 if prev_committed_epoch == committed_epoch {
270 return;
271 }
272 }
273 if self.latest_epoch < committed_epoch {
274 debug!(
275 latest_epoch = self.latest_epoch,
276 committed_epoch, "committed_epoch exceed table watermark latest_epoch"
277 );
278 self.latest_epoch = committed_epoch;
279 }
280 self.committed_epoch = Some(committed_epoch);
281 self.committed_watermarks = Some(committed_watermark);
282 while let Some((old_epoch, _)) = self.staging_watermarks.front()
284 && *old_epoch <= committed_epoch
285 {
286 let _ = self.staging_watermarks.pop_front();
287 }
288 }
289}
290
291#[derive(Debug, Clone, Copy, Eq, PartialEq)]
292pub enum WatermarkDirection {
293 Ascending,
294 Descending,
295}
296
297impl Display for WatermarkDirection {
298 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
299 match self {
300 WatermarkDirection::Ascending => write!(f, "Ascending"),
301 WatermarkDirection::Descending => write!(f, "Descending"),
302 }
303 }
304}
305
306impl WatermarkDirection {
307 pub fn key_filter_by_watermark(
308 &self,
309 key: impl AsRef<[u8]>,
310 watermark: impl AsRef<[u8]>,
311 ) -> bool {
312 let key = key.as_ref();
313 let watermark = watermark.as_ref();
314 match self {
315 WatermarkDirection::Ascending => key < watermark,
316 WatermarkDirection::Descending => key > watermark,
317 }
318 }
319
320 pub fn datum_filter_by_watermark(
321 &self,
322 watermark_col_in_pk: impl ToDatumRef,
323 watermark: impl ToDatumRef,
324 order_type: OrderType,
325 ) -> bool {
326 let watermark_col_in_pk = watermark_col_in_pk.to_datum_ref();
327 let watermark = watermark.to_datum_ref();
328 match self {
329 WatermarkDirection::Ascending => {
330 cmp_datum(watermark_col_in_pk, watermark, order_type).is_lt()
332 }
333 WatermarkDirection::Descending => {
334 cmp_datum(watermark_col_in_pk, watermark, order_type).is_gt()
336 }
337 }
338 }
339
340 pub fn is_ascending(&self) -> bool {
341 match self {
342 WatermarkDirection::Ascending => true,
343 WatermarkDirection::Descending => false,
344 }
345 }
346}
347
348#[derive(Clone, Debug, PartialEq, EstimateSize)]
349pub struct VnodeWatermark {
350 vnode_bitmap: Arc<Bitmap>,
351 watermark: Bytes,
352}
353
354impl VnodeWatermark {
355 pub fn new(vnode_bitmap: Arc<Bitmap>, watermark: Bytes) -> Self {
356 Self {
357 vnode_bitmap,
358 watermark,
359 }
360 }
361
362 pub fn vnode_bitmap(&self) -> &Bitmap {
363 &self.vnode_bitmap
364 }
365
366 pub fn vnode_count(&self) -> usize {
368 self.vnode_bitmap.len()
369 }
370
371 pub fn watermark(&self) -> &Bytes {
372 &self.watermark
373 }
374
375 pub fn to_protobuf(&self) -> PbVnodeWatermark {
376 self.into()
377 }
378}
379
380impl From<PbVnodeWatermark> for VnodeWatermark {
381 fn from(pb: PbVnodeWatermark) -> Self {
382 Self {
383 vnode_bitmap: Arc::new(Bitmap::from(pb.vnode_bitmap.as_ref().unwrap())),
384 watermark: Bytes::from(pb.watermark),
385 }
386 }
387}
388
389impl From<&PbVnodeWatermark> for VnodeWatermark {
390 fn from(pb: &PbVnodeWatermark) -> Self {
391 Self {
392 vnode_bitmap: Arc::new(Bitmap::from(pb.vnode_bitmap.as_ref().unwrap())),
393 watermark: Bytes::from(pb.watermark.clone()),
394 }
395 }
396}
397
398impl From<VnodeWatermark> for PbVnodeWatermark {
399 fn from(watermark: VnodeWatermark) -> Self {
400 Self {
401 watermark: watermark.watermark.into(),
402 vnode_bitmap: Some(watermark.vnode_bitmap.to_protobuf()),
403 }
404 }
405}
406
407impl From<&VnodeWatermark> for PbVnodeWatermark {
408 fn from(watermark: &VnodeWatermark) -> Self {
409 Self {
410 watermark: watermark.watermark.to_vec(),
411 vnode_bitmap: Some(watermark.vnode_bitmap.to_protobuf()),
412 }
413 }
414}
415
416#[derive(Clone, Debug, PartialEq)]
417pub struct TableWatermarks {
418 pub watermarks: Vec<(HummockEpoch, Arc<[VnodeWatermark]>)>,
420 pub direction: WatermarkDirection,
421 pub watermark_type: WatermarkSerdeType,
422}
423
424impl TableWatermarks {
425 pub fn single_epoch(
426 epoch: HummockEpoch,
427 watermarks: Vec<VnodeWatermark>,
428 direction: WatermarkDirection,
429 watermark_type: WatermarkSerdeType,
430 ) -> Self {
431 let mut this = Self {
432 direction,
433 watermarks: Vec::new(),
434 watermark_type,
435 };
436 this.add_new_epoch_watermarks(epoch, watermarks.into(), direction, watermark_type);
437 this
438 }
439
440 pub fn add_new_epoch_watermarks(
441 &mut self,
442 epoch: HummockEpoch,
443 watermarks: Arc<[VnodeWatermark]>,
444 direction: WatermarkDirection,
445 watermark_type: WatermarkSerdeType,
446 ) {
447 assert_eq!(self.direction, direction);
448 assert_eq!(self.watermark_type, watermark_type);
449
450 if let Some((prev_epoch, _)) = self.watermarks.last() {
451 assert!(*prev_epoch < epoch);
452 }
453 if !watermarks.is_empty() {
454 let vnode_count = watermarks[0].vnode_count();
455 for watermark in &*watermarks {
456 assert_eq!(watermark.vnode_count(), vnode_count);
457 }
458 if let Some(existing_vnode_count) = self.vnode_count() {
459 assert_eq!(existing_vnode_count, vnode_count);
460 }
461 }
462 self.watermarks.push((epoch, watermarks));
463 }
464
465 fn vnode_count(&self) -> Option<usize> {
467 self.watermarks
468 .iter()
469 .flat_map(|(_, watermarks)| watermarks.as_ref())
470 .next()
471 .map(|w| w.vnode_count())
472 }
473}
474
475pub fn merge_multiple_new_table_watermarks(
476 table_watermarks_list: impl IntoIterator<Item = HashMap<TableId, TableWatermarks>>,
477) -> HashMap<TableId, TableWatermarks> {
478 #[allow(clippy::type_complexity)]
479 let mut ret: HashMap<
480 TableId,
481 (
482 WatermarkDirection,
483 BTreeMap<u64, Vec<VnodeWatermark>>,
484 WatermarkSerdeType,
485 ),
486 > = HashMap::new();
487 for table_watermarks in table_watermarks_list {
488 for (table_id, new_table_watermarks) in table_watermarks {
489 let epoch_watermarks = match ret.entry(table_id) {
490 Entry::Occupied(entry) => {
491 let (direction, epoch_watermarks, watermark_type) = entry.into_mut();
492 assert_eq!(&new_table_watermarks.direction, direction);
493 assert_eq!(&new_table_watermarks.watermark_type, watermark_type);
494 epoch_watermarks
495 }
496 Entry::Vacant(entry) => {
497 let (_, epoch_watermarks, _) = entry.insert((
498 new_table_watermarks.direction,
499 BTreeMap::new(),
500 new_table_watermarks.watermark_type,
501 ));
502 epoch_watermarks
503 }
504 };
505 for (new_epoch, new_epoch_watermarks) in new_table_watermarks.watermarks {
506 epoch_watermarks
507 .entry(new_epoch)
508 .or_insert_with(Vec::new)
509 .extend(new_epoch_watermarks.iter().cloned());
510 }
511 }
512 }
513 ret.into_iter()
514 .map(
515 |(table_id, (direction, epoch_watermarks, watermark_type))| {
516 (
517 table_id,
518 TableWatermarks {
519 direction,
520 watermarks: epoch_watermarks
522 .into_iter()
523 .map(|(epoch, watermarks)| (epoch, Arc::from(watermarks)))
524 .collect(),
525 watermark_type,
526 },
527 )
528 },
529 )
530 .collect()
531}
532
533impl TableWatermarks {
534 pub fn apply_new_table_watermarks(&mut self, newly_added_watermarks: &TableWatermarks) {
535 assert_eq!(self.direction, newly_added_watermarks.direction);
536 assert!(self.watermarks.iter().map(|(epoch, _)| epoch).is_sorted());
537 assert!(
538 newly_added_watermarks
539 .watermarks
540 .iter()
541 .map(|(epoch, _)| epoch)
542 .is_sorted()
543 );
544 if let Some((prev_last_epoch, _)) = self.watermarks.last()
546 && let Some((new_first_epoch, _)) = newly_added_watermarks.watermarks.first()
547 {
548 assert!(prev_last_epoch < new_first_epoch);
549 }
550 self.watermarks.extend(
551 newly_added_watermarks
552 .watermarks
553 .iter()
554 .map(|(epoch, new_watermarks)| (*epoch, new_watermarks.clone())),
555 );
556 }
557
558 pub fn clear_stale_epoch_watermark(&mut self, safe_epoch: u64) {
559 match self.watermarks.first() {
560 None => {
561 return;
563 }
564 Some((earliest_epoch, _)) => {
565 if *earliest_epoch >= safe_epoch {
566 return;
568 }
569 }
570 }
571 debug!("clear stale table watermark below epoch {}", safe_epoch);
572 let mut result_epoch_watermark = Vec::with_capacity(self.watermarks.len());
573 let mut set_vnode: HashSet<VirtualNode> = HashSet::new();
574 let mut vnode_count: Option<usize> = None; while let Some((epoch, _)) = self.watermarks.last() {
576 if *epoch >= safe_epoch {
577 let (epoch, watermarks) = self.watermarks.pop().expect("have check Some");
578 for watermark in watermarks.as_ref() {
579 vnode_count.get_or_insert_with(|| watermark.vnode_count());
580 for vnode in watermark.vnode_bitmap.iter_vnodes() {
581 set_vnode.insert(vnode);
582 }
583 }
584 result_epoch_watermark.push((epoch, watermarks));
585 } else {
586 break;
587 }
588 }
589 while vnode_count != Some(set_vnode.len())
590 && let Some((_, watermarks)) = self.watermarks.pop()
591 {
592 let mut new_vnode_watermarks = Vec::new();
593 for vnode_watermark in watermarks.as_ref() {
594 let mut new_set_vnode = Vec::new();
595 vnode_count.get_or_insert_with(|| vnode_watermark.vnode_count());
596 for vnode in vnode_watermark.vnode_bitmap.iter_vnodes() {
597 if set_vnode.insert(vnode) {
598 new_set_vnode.push(vnode);
599 }
600 }
601 if !new_set_vnode.is_empty() {
602 let mut builder = BitmapBuilder::zeroed(vnode_watermark.vnode_count());
603 for vnode in new_set_vnode {
604 builder.set(vnode.to_index(), true);
605 }
606 let bitmap = Arc::new(builder.finish());
607 new_vnode_watermarks.push(VnodeWatermark {
608 vnode_bitmap: bitmap,
609 watermark: vnode_watermark.watermark.clone(),
610 })
611 }
612 }
613 if !new_vnode_watermarks.is_empty() {
614 if let Some((last_epoch, last_watermarks)) = result_epoch_watermark.last_mut()
615 && *last_epoch == safe_epoch
616 {
617 *last_watermarks = Arc::from(
618 last_watermarks
619 .iter()
620 .cloned()
621 .chain(new_vnode_watermarks.into_iter())
622 .collect_vec(),
623 );
624 } else {
625 result_epoch_watermark.push((safe_epoch, Arc::from(new_vnode_watermarks)));
626 }
627 }
628 }
629 result_epoch_watermark.reverse();
632 assert!(
633 result_epoch_watermark
634 .is_sorted_by(|(first_epoch, _), (second_epoch, _)| { first_epoch < second_epoch })
635 );
636 *self = TableWatermarks {
637 watermarks: result_epoch_watermark,
638 direction: self.direction,
639 watermark_type: self.watermark_type,
640 }
641 }
642}
643
644impl TableWatermarks {
645 pub fn estimated_encode_len(&self) -> usize {
646 self.watermarks.len() * size_of::<HummockEpoch>()
647 + self
648 .watermarks
649 .iter()
650 .map(|(_, watermarks)| {
651 watermarks
652 .iter()
653 .map(|watermark| watermark.estimated_size())
654 .sum::<usize>()
655 })
656 .sum::<usize>()
657 + size_of::<bool>() }
659
660 pub fn to_protobuf(&self) -> PbTableWatermarks {
661 self.into()
662 }
663}
664
665impl From<&PbTableWatermarks> for WatermarkSerdeType {
666 fn from(pb: &PbTableWatermarks) -> Self {
667 match pb.raw_watermark_serde_type() {
668 PbWatermarkSerdeType::TypeUnspecified => {
669 #[expect(deprecated)]
671 if pb.is_non_pk_prefix {
672 WatermarkSerdeType::NonPkPrefix
673 } else {
674 WatermarkSerdeType::PkPrefix
675 }
676 }
677 PbWatermarkSerdeType::PkPrefix => WatermarkSerdeType::PkPrefix,
678 PbWatermarkSerdeType::NonPkPrefix => WatermarkSerdeType::NonPkPrefix,
679 PbWatermarkSerdeType::Value => WatermarkSerdeType::Value,
680 }
681 }
682}
683impl From<&PbTableWatermarks> for TableWatermarks {
684 fn from(pb: &PbTableWatermarks) -> Self {
685 let watermark_type = WatermarkSerdeType::from(pb);
686 Self {
687 watermarks: pb
688 .epoch_watermarks
689 .iter()
690 .map(|epoch_watermark| {
691 let epoch = epoch_watermark.epoch;
692 let watermarks = epoch_watermark
693 .watermarks
694 .iter()
695 .map(VnodeWatermark::from)
696 .collect();
697 (epoch, watermarks)
698 })
699 .collect(),
700 direction: if pb.is_ascending {
701 WatermarkDirection::Ascending
702 } else {
703 WatermarkDirection::Descending
704 },
705 watermark_type,
706 }
707 }
708}
709
710impl From<WatermarkSerdeType> for PbWatermarkSerdeType {
711 fn from(s: WatermarkSerdeType) -> Self {
712 match s {
713 WatermarkSerdeType::PkPrefix => PbWatermarkSerdeType::PkPrefix,
714 WatermarkSerdeType::NonPkPrefix => PbWatermarkSerdeType::NonPkPrefix,
715 WatermarkSerdeType::Value => PbWatermarkSerdeType::Value,
716 }
717 }
718}
719
720impl From<&TableWatermarks> for PbTableWatermarks {
721 fn from(table_watermarks: &TableWatermarks) -> Self {
722 let is_non_pk_prefix = match table_watermarks.watermark_type {
723 WatermarkSerdeType::PkPrefix => false,
724 WatermarkSerdeType::NonPkPrefix => true,
725 WatermarkSerdeType::Value => false,
726 };
727 Self {
728 epoch_watermarks: table_watermarks
729 .watermarks
730 .iter()
731 .map(|(epoch, watermarks)| PbEpochNewWatermarks {
732 watermarks: watermarks.iter().map(|wm| wm.into()).collect(),
733 epoch: *epoch,
734 })
735 .collect(),
736 is_ascending: match table_watermarks.direction {
737 WatermarkDirection::Ascending => true,
738 WatermarkDirection::Descending => false,
739 },
740 #[expect(deprecated)]
741 is_non_pk_prefix,
742 raw_watermark_serde_type: PbWatermarkSerdeType::from(table_watermarks.watermark_type)
743 as i32,
744 }
745 }
746}
747
748impl From<PbTableWatermarks> for TableWatermarks {
749 fn from(pb: PbTableWatermarks) -> Self {
750 let watermark_type = WatermarkSerdeType::from(&pb);
751 Self {
752 watermarks: pb
753 .epoch_watermarks
754 .into_iter()
755 .map(|epoch_watermark| {
756 let epoch = epoch_watermark.epoch;
757 let watermarks = epoch_watermark
758 .watermarks
759 .into_iter()
760 .map(VnodeWatermark::from)
761 .collect();
762 (epoch, watermarks)
763 })
764 .collect(),
765 direction: if pb.is_ascending {
766 WatermarkDirection::Ascending
767 } else {
768 WatermarkDirection::Descending
769 },
770 watermark_type,
771 }
772 }
773}
774
775impl From<TableWatermarks> for PbTableWatermarks {
776 fn from(table_watermarks: TableWatermarks) -> Self {
777 Self {
778 epoch_watermarks: table_watermarks
779 .watermarks
780 .into_iter()
781 .map(|(epoch, watermarks)| PbEpochNewWatermarks {
782 watermarks: watermarks.iter().map(PbVnodeWatermark::from).collect(),
783 epoch,
784 })
785 .collect(),
786 is_ascending: match table_watermarks.direction {
787 WatermarkDirection::Ascending => true,
788 WatermarkDirection::Descending => false,
789 },
790 #[expect(deprecated)]
791 is_non_pk_prefix: match table_watermarks.watermark_type {
792 WatermarkSerdeType::NonPkPrefix => true,
793 WatermarkSerdeType::PkPrefix => false,
794 WatermarkSerdeType::Value => false,
795 },
796 raw_watermark_serde_type: PbWatermarkSerdeType::from(table_watermarks.watermark_type)
797 as i32,
798 }
799 }
800}
801
802#[derive(Debug, Clone, Copy, PartialEq, Eq)]
803pub enum WatermarkSerdeType {
804 PkPrefix,
805 NonPkPrefix,
806 Value,
807}
808
809#[cfg(test)]
810mod tests {
811 use std::collections::Bound::Included;
812 use std::collections::{Bound, HashMap};
813 use std::ops::Bound::Excluded;
814 use std::sync::Arc;
815 use std::vec;
816
817 use bytes::Bytes;
818 use itertools::Itertools;
819 use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
820 use risingwave_common::catalog::TableId;
821 use risingwave_common::hash::VirtualNode;
822 use risingwave_common::util::epoch::{EpochExt, test_epoch};
823 use risingwave_pb::hummock::{PbHummockVersion, StateTableInfo};
824
825 use crate::compaction_group::StaticCompactionGroupId;
826 use crate::key::{TableKeyRange, is_empty_key_range, prefixed_range_with_vnode};
827 use crate::table_watermark::{
828 PkPrefixTableWatermarksIndex, TableWatermarks, VnodeWatermark, WatermarkDirection,
829 WatermarkSerdeType, merge_multiple_new_table_watermarks,
830 };
831 use crate::version::HummockVersion;
832
833 fn build_bitmap(vnodes: impl IntoIterator<Item = usize>) -> Arc<Bitmap> {
834 let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
835 for vnode in vnodes {
836 builder.set(vnode, true);
837 }
838 Arc::new(builder.finish())
839 }
840
841 #[test]
842 fn test_apply_new_table_watermark() {
843 let epoch1 = test_epoch(1);
844 let direction = WatermarkDirection::Ascending;
845 let watermark1 = Bytes::from("watermark1");
846 let watermark2 = Bytes::from("watermark2");
847 let watermark3 = Bytes::from("watermark3");
848 let watermark4 = Bytes::from("watermark4");
849 let watermark_type = WatermarkSerdeType::PkPrefix;
850 let mut table_watermarks = TableWatermarks::single_epoch(
851 epoch1,
852 vec![VnodeWatermark::new(build_bitmap(vec![0, 1, 2]), watermark1)],
853 direction,
854 watermark_type,
855 );
856 let epoch2 = epoch1.next_epoch();
857 table_watermarks.add_new_epoch_watermarks(
858 epoch2,
859 vec![VnodeWatermark::new(
860 build_bitmap(vec![0, 1, 2, 3]),
861 watermark2,
862 )]
863 .into(),
864 direction,
865 watermark_type,
866 );
867
868 let mut table_watermark_checkpoint = table_watermarks.clone();
869
870 let epoch3 = epoch2.next_epoch();
871 let mut second_table_watermark = TableWatermarks::single_epoch(
872 epoch3,
873 vec![VnodeWatermark::new(
874 build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
875 watermark3.clone(),
876 )],
877 direction,
878 watermark_type,
879 );
880 table_watermarks.add_new_epoch_watermarks(
881 epoch3,
882 vec![VnodeWatermark::new(
883 build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
884 watermark3,
885 )]
886 .into(),
887 direction,
888 watermark_type,
889 );
890 let epoch4 = epoch3.next_epoch();
891 let epoch5 = epoch4.next_epoch();
892 table_watermarks.add_new_epoch_watermarks(
893 epoch5,
894 vec![VnodeWatermark::new(
895 build_bitmap(vec![0, 3, 4]),
896 watermark4.clone(),
897 )]
898 .into(),
899 direction,
900 watermark_type,
901 );
902 second_table_watermark.add_new_epoch_watermarks(
903 epoch5,
904 vec![VnodeWatermark::new(build_bitmap(vec![0, 3, 4]), watermark4)].into(),
905 direction,
906 watermark_type,
907 );
908
909 table_watermark_checkpoint.apply_new_table_watermarks(&second_table_watermark);
910 assert_eq!(table_watermarks, table_watermark_checkpoint);
911 }
912
913 #[test]
914 fn test_clear_stale_epoch_watmermark() {
915 let epoch1 = test_epoch(1);
916 let direction = WatermarkDirection::Ascending;
917 let watermark1 = Bytes::from("watermark1");
918 let watermark2 = Bytes::from("watermark2");
919 let watermark3 = Bytes::from("watermark3");
920 let watermark4 = Bytes::from("watermark4");
921 let watermark_type = WatermarkSerdeType::PkPrefix;
922 let mut table_watermarks = TableWatermarks::single_epoch(
923 epoch1,
924 vec![VnodeWatermark::new(build_bitmap(vec![0, 1, 2]), watermark1)],
925 direction,
926 watermark_type,
927 );
928 let epoch2 = epoch1.next_epoch();
929 table_watermarks.add_new_epoch_watermarks(
930 epoch2,
931 vec![VnodeWatermark::new(
932 build_bitmap(vec![0, 1, 2, 3]),
933 watermark2.clone(),
934 )]
935 .into(),
936 direction,
937 watermark_type,
938 );
939 let epoch3 = epoch2.next_epoch();
940 table_watermarks.add_new_epoch_watermarks(
941 epoch3,
942 vec![VnodeWatermark::new(
943 build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
944 watermark3.clone(),
945 )]
946 .into(),
947 direction,
948 watermark_type,
949 );
950 let epoch4 = epoch3.next_epoch();
951 let epoch5 = epoch4.next_epoch();
952 table_watermarks.add_new_epoch_watermarks(
953 epoch5,
954 vec![VnodeWatermark::new(
955 build_bitmap(vec![0, 3, 4]),
956 watermark4.clone(),
957 )]
958 .into(),
959 direction,
960 watermark_type,
961 );
962
963 let mut table_watermarks_checkpoint = table_watermarks.clone();
964 table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch1);
965 assert_eq!(table_watermarks_checkpoint, table_watermarks);
966
967 table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch2);
968 assert_eq!(
969 table_watermarks_checkpoint,
970 TableWatermarks {
971 watermarks: vec![
972 (
973 epoch2,
974 vec![VnodeWatermark::new(
975 build_bitmap(vec![0, 1, 2, 3]),
976 watermark2,
977 )]
978 .into()
979 ),
980 (
981 epoch3,
982 vec![VnodeWatermark::new(
983 build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
984 watermark3.clone(),
985 )]
986 .into()
987 ),
988 (
989 epoch5,
990 vec![VnodeWatermark::new(
991 build_bitmap(vec![0, 3, 4]),
992 watermark4.clone(),
993 )]
994 .into()
995 )
996 ],
997 direction,
998 watermark_type,
999 }
1000 );
1001
1002 table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch3);
1003 assert_eq!(
1004 table_watermarks_checkpoint,
1005 TableWatermarks {
1006 watermarks: vec![
1007 (
1008 epoch3,
1009 vec![VnodeWatermark::new(
1010 build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
1011 watermark3.clone(),
1012 )]
1013 .into()
1014 ),
1015 (
1016 epoch5,
1017 vec![VnodeWatermark::new(
1018 build_bitmap(vec![0, 3, 4]),
1019 watermark4.clone(),
1020 )]
1021 .into()
1022 )
1023 ],
1024 direction,
1025 watermark_type,
1026 }
1027 );
1028
1029 table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch4);
1030 assert_eq!(
1031 table_watermarks_checkpoint,
1032 TableWatermarks {
1033 watermarks: vec![
1034 (
1035 epoch4,
1036 vec![VnodeWatermark::new(
1037 build_bitmap((1..3).chain(5..VirtualNode::COUNT_FOR_TEST)),
1038 watermark3.clone()
1039 )]
1040 .into()
1041 ),
1042 (
1043 epoch5,
1044 vec![VnodeWatermark::new(
1045 build_bitmap(vec![0, 3, 4]),
1046 watermark4.clone(),
1047 )]
1048 .into()
1049 )
1050 ],
1051 direction,
1052 watermark_type,
1053 }
1054 );
1055
1056 table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch5);
1057 assert_eq!(
1058 table_watermarks_checkpoint,
1059 TableWatermarks {
1060 watermarks: vec![(
1061 epoch5,
1062 vec![
1063 VnodeWatermark::new(build_bitmap(vec![0, 3, 4]), watermark4),
1064 VnodeWatermark::new(
1065 build_bitmap((1..3).chain(5..VirtualNode::COUNT_FOR_TEST)),
1066 watermark3
1067 )
1068 ]
1069 .into()
1070 )],
1071 direction,
1072 watermark_type,
1073 }
1074 );
1075 }
1076
1077 #[test]
1078 fn test_merge_multiple_new_table_watermarks() {
1079 fn epoch_new_watermark(epoch: u64, bitmaps: Vec<&Bitmap>) -> (u64, Arc<[VnodeWatermark]>) {
1080 (
1081 epoch,
1082 bitmaps
1083 .into_iter()
1084 .map(|bitmap| VnodeWatermark {
1085 watermark: Bytes::from(vec![1, 2, epoch as _]),
1086 vnode_bitmap: Arc::new(bitmap.clone()),
1087 })
1088 .collect_vec()
1089 .into(),
1090 )
1091 }
1092 fn build_table_watermark(
1093 vnodes: impl IntoIterator<Item = usize>,
1094 epochs: impl IntoIterator<Item = u64>,
1095 ) -> TableWatermarks {
1096 let bitmap = build_bitmap(vnodes);
1097 TableWatermarks {
1098 watermarks: epochs
1099 .into_iter()
1100 .map(|epoch: u64| epoch_new_watermark(epoch, vec![&bitmap]))
1101 .collect(),
1102 direction: WatermarkDirection::Ascending,
1103 watermark_type: WatermarkSerdeType::PkPrefix,
1104 }
1105 }
1106 let table1_watermark1 = build_table_watermark(0..3, vec![1, 2, 4]);
1107 let table1_watermark2 = build_table_watermark(4..6, vec![1, 2, 5]);
1108 let table2_watermark = build_table_watermark(0..4, 1..3);
1109 let table3_watermark = build_table_watermark(0..4, 3..5);
1110 let mut first = HashMap::new();
1111 first.insert(TableId::new(1), table1_watermark1);
1112 first.insert(TableId::new(2), table2_watermark.clone());
1113 let mut second = HashMap::new();
1114 second.insert(TableId::new(1), table1_watermark2);
1115 second.insert(TableId::new(3), table3_watermark.clone());
1116 let result = merge_multiple_new_table_watermarks(vec![first, second]);
1117 let mut expected = HashMap::new();
1118 expected.insert(
1119 TableId::new(1),
1120 TableWatermarks {
1121 watermarks: vec![
1122 epoch_new_watermark(1, vec![&build_bitmap(0..3), &build_bitmap(4..6)]),
1123 epoch_new_watermark(2, vec![&build_bitmap(0..3), &build_bitmap(4..6)]),
1124 epoch_new_watermark(4, vec![&build_bitmap(0..3)]),
1125 epoch_new_watermark(5, vec![&build_bitmap(4..6)]),
1126 ],
1127 direction: WatermarkDirection::Ascending,
1128 watermark_type: WatermarkSerdeType::PkPrefix,
1129 },
1130 );
1131 expected.insert(TableId::new(2), table2_watermark);
1132 expected.insert(TableId::new(3), table3_watermark);
1133 assert_eq!(result, expected);
1134 }
1135
1136 const COMMITTED_EPOCH: u64 = test_epoch(1);
1137 const EPOCH1: u64 = test_epoch(2);
1138 const EPOCH2: u64 = test_epoch(3);
1139 const TEST_SINGLE_VNODE: VirtualNode = VirtualNode::from_index(1);
1140
1141 fn build_watermark_range(
1142 direction: WatermarkDirection,
1143 (low, high): (Bound<Bytes>, Bound<Bytes>),
1144 ) -> TableKeyRange {
1145 let range = match direction {
1146 WatermarkDirection::Ascending => (low, high),
1147 WatermarkDirection::Descending => (high, low),
1148 };
1149 prefixed_range_with_vnode(range, TEST_SINGLE_VNODE)
1150 }
1151
1152 fn build_and_test_watermark_index(
1156 direction: WatermarkDirection,
1157 watermark1: Bytes,
1158 watermark2: Bytes,
1159 watermark3: Bytes,
1160 ) -> PkPrefixTableWatermarksIndex {
1161 let mut index = PkPrefixTableWatermarksIndex::new(
1162 direction,
1163 EPOCH1,
1164 vec![VnodeWatermark::new(build_bitmap(0..4), watermark1.clone())],
1165 Some(COMMITTED_EPOCH),
1166 );
1167 index.add_epoch_watermark(
1168 EPOCH2,
1169 vec![VnodeWatermark::new(build_bitmap(1..5), watermark2.clone())].into(),
1170 direction,
1171 );
1172
1173 assert_eq!(
1174 index.read_watermark(VirtualNode::from_index(0), EPOCH1),
1175 Some(watermark1.clone())
1176 );
1177 assert_eq!(
1178 index.read_watermark(VirtualNode::from_index(1), EPOCH1),
1179 Some(watermark1.clone())
1180 );
1181 assert_eq!(
1182 index.read_watermark(VirtualNode::from_index(4), EPOCH1),
1183 None
1184 );
1185 assert_eq!(
1186 index.read_watermark(VirtualNode::from_index(0), EPOCH2),
1187 Some(watermark1.clone())
1188 );
1189 assert_eq!(
1190 index.read_watermark(VirtualNode::from_index(1), EPOCH2),
1191 Some(watermark2.clone())
1192 );
1193 assert_eq!(
1194 index.read_watermark(VirtualNode::from_index(4), EPOCH2),
1195 Some(watermark2.clone())
1196 );
1197 assert_eq!(
1198 index.latest_watermark(VirtualNode::from_index(0)),
1199 Some(watermark1.clone())
1200 );
1201 assert_eq!(
1202 index.latest_watermark(VirtualNode::from_index(1)),
1203 Some(watermark2.clone())
1204 );
1205 assert_eq!(
1206 index.latest_watermark(VirtualNode::from_index(4)),
1207 Some(watermark2.clone())
1208 );
1209
1210 let check_watermark_range =
1212 |query_range: (Bound<Bytes>, Bound<Bytes>),
1213 output_range: Option<(Bound<Bytes>, Bound<Bytes>)>| {
1214 let mut range = build_watermark_range(direction, query_range);
1215 index.rewrite_range_with_table_watermark(EPOCH2, &mut range);
1216 if let Some(output_range) = output_range {
1217 assert_eq!(range, build_watermark_range(direction, output_range));
1218 } else {
1219 assert!(is_empty_key_range(&range));
1220 }
1221 };
1222
1223 check_watermark_range(
1225 (Included(watermark1.clone()), Excluded(watermark3.clone())),
1226 Some((Included(watermark2.clone()), Excluded(watermark3.clone()))),
1227 );
1228
1229 check_watermark_range(
1231 (Included(watermark2.clone()), Excluded(watermark3.clone())),
1232 Some((Included(watermark2.clone()), Excluded(watermark3.clone()))),
1233 );
1234 check_watermark_range(
1235 (Excluded(watermark2.clone()), Excluded(watermark3.clone())),
1236 Some((Excluded(watermark2.clone()), Excluded(watermark3))),
1237 );
1238
1239 check_watermark_range(
1241 (Excluded(watermark1.clone()), Excluded(watermark2.clone())),
1242 None,
1243 );
1244 check_watermark_range(
1245 (Excluded(watermark1), Included(watermark2.clone())),
1246 Some((Included(watermark2.clone()), Included(watermark2))),
1247 );
1248
1249 index
1250 }
1251
1252 #[test]
1253 fn test_watermark_index_ascending() {
1254 let watermark1 = Bytes::from_static(b"watermark1");
1255 let watermark2 = Bytes::from_static(b"watermark2");
1256 let watermark3 = Bytes::from_static(b"watermark3");
1257 build_and_test_watermark_index(
1258 WatermarkDirection::Ascending,
1259 watermark1,
1260 watermark2,
1261 watermark3,
1262 );
1263 }
1264
1265 #[test]
1266 fn test_watermark_index_descending() {
1267 let watermark1 = Bytes::from_static(b"watermark254");
1268 let watermark2 = Bytes::from_static(b"watermark253");
1269 let watermark3 = Bytes::from_static(b"watermark252");
1270 build_and_test_watermark_index(
1271 WatermarkDirection::Descending,
1272 watermark1,
1273 watermark2,
1274 watermark3,
1275 );
1276 }
1277
1278 #[test]
1279 fn test_apply_committed_index() {
1280 let watermark1 = Bytes::from_static(b"watermark1");
1281 let watermark2 = Bytes::from_static(b"watermark2");
1282 let watermark3 = Bytes::from_static(b"watermark3");
1283 let mut index = build_and_test_watermark_index(
1284 WatermarkDirection::Ascending,
1285 watermark1.clone(),
1286 watermark2.clone(),
1287 watermark3,
1288 );
1289
1290 let test_table_id = TableId::from(233);
1291
1292 let mut version = HummockVersion::from_rpc_protobuf(&PbHummockVersion {
1293 state_table_info: HashMap::from_iter([(
1294 test_table_id,
1295 StateTableInfo {
1296 committed_epoch: EPOCH1,
1297 compaction_group_id: StaticCompactionGroupId::StateDefault as _,
1298 },
1299 )]),
1300 ..Default::default()
1301 });
1302 version.table_watermarks.insert(
1303 test_table_id,
1304 TableWatermarks {
1305 watermarks: vec![(
1306 EPOCH1,
1307 vec![VnodeWatermark {
1308 watermark: watermark1.clone(),
1309 vnode_bitmap: build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
1310 }]
1311 .into(),
1312 )],
1313 direction: WatermarkDirection::Ascending,
1314 watermark_type: WatermarkSerdeType::PkPrefix,
1315 }
1316 .into(),
1317 );
1318 index.apply_committed_watermarks(
1319 version
1320 .table_watermarks
1321 .get(&test_table_id)
1322 .unwrap()
1323 .clone(),
1324 EPOCH1,
1325 );
1326 assert_eq!(EPOCH1, index.committed_epoch.unwrap());
1327 assert_eq!(EPOCH2, index.latest_epoch);
1328 for vnode in 0..VirtualNode::COUNT_FOR_TEST {
1329 let vnode = VirtualNode::from_index(vnode);
1330 if (1..5).contains(&vnode.to_index()) {
1331 assert_eq!(watermark1, index.read_watermark(vnode, EPOCH1).unwrap());
1332 assert_eq!(watermark2, index.read_watermark(vnode, EPOCH2).unwrap());
1333 } else {
1334 assert_eq!(watermark1, index.read_watermark(vnode, EPOCH1).unwrap());
1335 }
1336 }
1337 }
1338
1339 #[test]
1340 fn test_filter_regress_watermark() {
1341 let watermark1 = Bytes::from_static(b"watermark1");
1342 let watermark2 = Bytes::from_static(b"watermark2");
1343 let watermark3 = Bytes::from_static(b"watermark3");
1344 let index = build_and_test_watermark_index(
1345 WatermarkDirection::Ascending,
1346 watermark1.clone(),
1347 watermark2,
1348 watermark3.clone(),
1349 );
1350
1351 let mut new_watermarks = vec![
1352 VnodeWatermark {
1354 vnode_bitmap: build_bitmap(0..2),
1355 watermark: watermark1.clone(),
1356 },
1357 VnodeWatermark {
1359 vnode_bitmap: build_bitmap(2..4),
1360 watermark: watermark3.clone(),
1361 },
1362 VnodeWatermark {
1364 vnode_bitmap: build_bitmap(4..5),
1365 watermark: watermark1.clone(),
1366 },
1367 VnodeWatermark {
1369 vnode_bitmap: build_bitmap(5..6),
1370 watermark: watermark3.clone(),
1371 },
1372 ];
1373
1374 index.filter_regress_watermarks(&mut new_watermarks);
1375
1376 assert_eq!(
1377 new_watermarks,
1378 vec![
1379 VnodeWatermark {
1380 vnode_bitmap: build_bitmap(0..1),
1381 watermark: watermark1,
1382 },
1383 VnodeWatermark {
1384 vnode_bitmap: build_bitmap(2..4),
1385 watermark: watermark3.clone(),
1386 },
1387 VnodeWatermark {
1388 vnode_bitmap: build_bitmap(5..6),
1389 watermark: watermark3,
1390 },
1391 ]
1392 );
1393 }
1394}