1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
17use std::fmt::Display;
18use std::mem::size_of;
19use std::ops::Bound::{Excluded, Included, Unbounded};
20use std::sync::Arc;
21
22use bytes::Bytes;
23use itertools::Itertools;
24use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
25use risingwave_common::catalog::TableId;
26use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
27use risingwave_common::types::ToDatumRef;
28use risingwave_common::util::sort_util::{OrderType, cmp_datum};
29use risingwave_common_estimate_size::EstimateSize;
30use risingwave_pb::hummock::table_watermarks::PbEpochNewWatermarks;
31use risingwave_pb::hummock::{
32 PbVnodeWatermark, PbWatermarkSerdeType, TableWatermarks as PbTableWatermarks,
33};
34use tracing::{debug, warn};
35
36use crate::HummockEpoch;
37use crate::key::{TableKey, TableKeyRange, prefix_slice_with_vnode, vnode};
38
39#[derive(Clone)]
40pub struct ReadTableWatermark {
41 pub direction: WatermarkDirection,
42 pub vnode_watermarks: BTreeMap<VirtualNode, Bytes>,
43}
44
45#[derive(Clone)]
46pub struct TableWatermarksIndex {
47 pub watermark_direction: WatermarkDirection,
48 pub staging_watermarks: VecDeque<(HummockEpoch, Arc<[VnodeWatermark]>)>,
50 pub committed_watermarks: Option<Arc<TableWatermarks>>,
51 latest_epoch: HummockEpoch,
52 committed_epoch: Option<HummockEpoch>,
53 watermark_type: WatermarkSerdeType,
54}
55
56impl TableWatermarksIndex {
57 pub fn new(
58 watermark_direction: WatermarkDirection,
59 first_epoch: HummockEpoch,
60 first_vnode_watermark: Vec<VnodeWatermark>,
61 committed_epoch: Option<HummockEpoch>,
62 watermark_type: WatermarkSerdeType,
63 ) -> Self {
64 if let Some(committed_epoch) = committed_epoch {
65 assert!(first_epoch > committed_epoch);
66 }
67 Self {
68 watermark_direction,
69 staging_watermarks: VecDeque::from_iter([(
70 first_epoch,
71 Arc::from(first_vnode_watermark),
72 )]),
73 committed_watermarks: None,
74 latest_epoch: first_epoch,
75 committed_epoch,
76 watermark_type,
77 }
78 }
79
80 pub fn new_committed(
81 committed_watermarks: Arc<TableWatermarks>,
82 committed_epoch: HummockEpoch,
83 watermark_type: WatermarkSerdeType,
84 ) -> Self {
85 Self {
86 watermark_direction: committed_watermarks.direction,
87 staging_watermarks: VecDeque::new(),
88 committed_epoch: Some(committed_epoch),
89 latest_epoch: committed_epoch,
90 committed_watermarks: Some(committed_watermarks),
91 watermark_type,
92 }
93 }
94
95 pub fn read_watermark(&self, vnode: VirtualNode, epoch: HummockEpoch) -> Option<Bytes> {
96 for (watermark_epoch, vnode_watermark_list) in self.staging_watermarks.iter().rev().chain(
98 self.committed_watermarks
99 .iter()
100 .flat_map(|watermarks| watermarks.watermarks.iter().rev()),
101 ) {
102 if *watermark_epoch > epoch {
103 continue;
104 }
105 for vnode_watermark in vnode_watermark_list.as_ref() {
106 if vnode_watermark.vnode_bitmap.is_set(vnode.to_index()) {
107 return Some(vnode_watermark.watermark.clone());
108 }
109 }
110 }
111 None
112 }
113
114 pub fn latest_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
115 self.read_watermark(vnode, HummockEpoch::MAX)
116 }
117
118 pub fn rewrite_range_with_table_watermark(
119 &self,
120 epoch: HummockEpoch,
121 key_range: &mut TableKeyRange,
122 ) {
123 if !matches!(self.watermark_type, WatermarkSerdeType::PkPrefix) {
124 return;
126 }
127 let vnode = vnode(key_range);
128 if let Some(watermark) = self.read_watermark(vnode, epoch) {
129 match self.watermark_direction {
130 WatermarkDirection::Ascending => {
131 let overwrite_start_key = match &key_range.0 {
132 Included(start_key) | Excluded(start_key) => {
133 start_key.key_part() < watermark
134 }
135 Unbounded => true,
136 };
137 if overwrite_start_key {
138 let watermark_key = TableKey(prefix_slice_with_vnode(vnode, &watermark));
139 let fully_filtered = match &key_range.1 {
140 Included(end_key) => end_key < &watermark_key,
141 Excluded(end_key) => end_key <= &watermark_key,
142 Unbounded => false,
143 };
144 if fully_filtered {
145 key_range.1 = Excluded(watermark_key.clone());
146 }
147 key_range.0 = Included(watermark_key);
148 }
149 }
150 WatermarkDirection::Descending => {
151 let overwrite_end_key = match &key_range.1 {
152 Included(end_key) | Excluded(end_key) => end_key.key_part() > watermark,
153 Unbounded => true,
154 };
155 if overwrite_end_key {
156 let watermark_key = TableKey(prefix_slice_with_vnode(vnode, &watermark));
157 let fully_filtered = match &key_range.0 {
158 Included(start_key) => start_key > &watermark_key,
159 Excluded(start_key) => start_key >= &watermark_key,
160 Unbounded => false,
161 };
162 if fully_filtered {
163 *key_range = (Included(watermark_key.clone()), Excluded(watermark_key));
164 } else {
165 key_range.1 = Included(watermark_key);
166 }
167 }
168 }
169 }
170 }
171 }
172
173 pub fn filter_regress_watermarks(&self, watermarks: &mut Vec<VnodeWatermark>) {
174 if !matches!(self.watermark_type, WatermarkSerdeType::PkPrefix) {
175 return;
177 }
178 let mut ret = Vec::with_capacity(watermarks.len());
179 for watermark in watermarks.drain(..) {
180 let vnode_count = watermark.vnode_count();
181
182 let mut regress_vnodes = None;
183 for vnode in watermark.vnode_bitmap.iter_vnodes() {
184 if let Some(prev_watermark) = self.latest_watermark(vnode) {
185 let is_regress = match self.direction() {
186 WatermarkDirection::Ascending => prev_watermark > watermark.watermark,
187 WatermarkDirection::Descending => prev_watermark < watermark.watermark,
188 };
189 if is_regress {
190 warn!(
191 "table watermark regress: {:?} {} {:?} {:?}",
192 self.direction(),
193 vnode,
194 watermark.watermark,
195 prev_watermark
196 );
197 regress_vnodes
198 .get_or_insert_with(|| BitmapBuilder::zeroed(vnode_count))
199 .set(vnode.to_index(), true);
200 }
201 }
202 }
203 if let Some(regress_vnodes) = regress_vnodes {
204 let mut bitmap_builder = None;
205 for vnode in watermark.vnode_bitmap.iter_vnodes() {
206 let vnode_index = vnode.to_index();
207 if !regress_vnodes.is_set(vnode_index) {
208 bitmap_builder
209 .get_or_insert_with(|| BitmapBuilder::zeroed(vnode_count))
210 .set(vnode_index, true);
211 }
212 }
213 if let Some(bitmap_builder) = bitmap_builder {
214 ret.push(VnodeWatermark::new(
215 Arc::new(bitmap_builder.finish()),
216 watermark.watermark,
217 ));
218 }
219 } else {
220 ret.push(watermark);
222 }
223 }
224 *watermarks = ret;
225 }
226
227 pub fn direction(&self) -> WatermarkDirection {
228 self.watermark_direction
229 }
230
231 pub fn add_epoch_watermark(
232 &mut self,
233 epoch: HummockEpoch,
234 vnode_watermark_list: Arc<[VnodeWatermark]>,
235 direction: WatermarkDirection,
236 ) {
237 assert!(epoch > self.latest_epoch);
238 assert_eq!(self.watermark_direction, direction);
239 self.latest_epoch = epoch;
240 #[cfg(debug_assertions)]
241 if !vnode_watermark_list.is_empty() {
242 let vnode_count = vnode_watermark_list[0].vnode_count();
243 let mut vnode_is_set = BitmapBuilder::zeroed(vnode_count);
244 for vnode_watermark in vnode_watermark_list.as_ref() {
245 for vnode in vnode_watermark.vnode_bitmap.iter_ones() {
246 assert!(!vnode_is_set.is_set(vnode));
247 vnode_is_set.set(vnode, true);
248 let vnode = VirtualNode::from_index(vnode);
249 if let Some(prev_watermark) = self.latest_watermark(vnode) {
250 match self.watermark_direction {
251 WatermarkDirection::Ascending => {
252 assert!(vnode_watermark.watermark >= prev_watermark);
253 }
254 WatermarkDirection::Descending => {
255 assert!(vnode_watermark.watermark <= prev_watermark);
256 }
257 };
258 }
259 }
260 }
261 }
262 self.staging_watermarks
263 .push_back((epoch, vnode_watermark_list));
264 }
265
266 pub fn apply_committed_watermarks(
267 &mut self,
268 committed_watermark: Arc<TableWatermarks>,
269 committed_epoch: HummockEpoch,
270 ) {
271 assert_eq!(self.watermark_direction, committed_watermark.direction);
272 if let Some(prev_committed_epoch) = self.committed_epoch {
273 assert!(prev_committed_epoch <= committed_epoch);
274 if prev_committed_epoch == committed_epoch {
275 return;
276 }
277 }
278 if self.latest_epoch < committed_epoch {
279 debug!(
280 latest_epoch = self.latest_epoch,
281 committed_epoch, "committed_epoch exceed table watermark latest_epoch"
282 );
283 self.latest_epoch = committed_epoch;
284 }
285 self.committed_epoch = Some(committed_epoch);
286 self.committed_watermarks = Some(committed_watermark);
287 while let Some((old_epoch, _)) = self.staging_watermarks.front()
289 && *old_epoch <= committed_epoch
290 {
291 let _ = self.staging_watermarks.pop_front();
292 }
293 }
294}
295
296#[derive(Debug, Clone, Copy, Eq, PartialEq)]
297pub enum WatermarkDirection {
298 Ascending,
299 Descending,
300}
301
302impl Display for WatermarkDirection {
303 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
304 match self {
305 WatermarkDirection::Ascending => write!(f, "Ascending"),
306 WatermarkDirection::Descending => write!(f, "Descending"),
307 }
308 }
309}
310
311impl WatermarkDirection {
312 pub fn key_filter_by_watermark(
313 &self,
314 key: impl AsRef<[u8]>,
315 watermark: impl AsRef<[u8]>,
316 ) -> bool {
317 let key = key.as_ref();
318 let watermark = watermark.as_ref();
319 match self {
320 WatermarkDirection::Ascending => key < watermark,
321 WatermarkDirection::Descending => key > watermark,
322 }
323 }
324
325 pub fn datum_filter_by_watermark(
326 &self,
327 watermark_col: impl ToDatumRef,
328 watermark: impl ToDatumRef,
329 order_type: OrderType,
330 ) -> bool {
331 let watermark_col = watermark_col.to_datum_ref();
332 let watermark = watermark.to_datum_ref();
333 match self {
334 WatermarkDirection::Ascending => {
335 cmp_datum(watermark_col, watermark, order_type).is_lt()
337 }
338 WatermarkDirection::Descending => {
339 cmp_datum(watermark_col, watermark, order_type).is_gt()
341 }
342 }
343 }
344
345 pub fn is_ascending(&self) -> bool {
346 match self {
347 WatermarkDirection::Ascending => true,
348 WatermarkDirection::Descending => false,
349 }
350 }
351}
352
353#[derive(Clone, Debug, PartialEq, EstimateSize)]
354pub struct VnodeWatermark {
355 vnode_bitmap: Arc<Bitmap>,
356 watermark: Bytes,
357}
358
359impl VnodeWatermark {
360 pub fn new(vnode_bitmap: Arc<Bitmap>, watermark: Bytes) -> Self {
361 Self {
362 vnode_bitmap,
363 watermark,
364 }
365 }
366
367 pub fn vnode_bitmap(&self) -> &Bitmap {
368 &self.vnode_bitmap
369 }
370
371 pub fn vnode_count(&self) -> usize {
373 self.vnode_bitmap.len()
374 }
375
376 pub fn watermark(&self) -> &Bytes {
377 &self.watermark
378 }
379
380 pub fn to_protobuf(&self) -> PbVnodeWatermark {
381 self.into()
382 }
383}
384
385impl From<PbVnodeWatermark> for VnodeWatermark {
386 fn from(pb: PbVnodeWatermark) -> Self {
387 Self {
388 vnode_bitmap: Arc::new(Bitmap::from(pb.vnode_bitmap.as_ref().unwrap())),
389 watermark: Bytes::from(pb.watermark),
390 }
391 }
392}
393
394impl From<&PbVnodeWatermark> for VnodeWatermark {
395 fn from(pb: &PbVnodeWatermark) -> Self {
396 Self {
397 vnode_bitmap: Arc::new(Bitmap::from(pb.vnode_bitmap.as_ref().unwrap())),
398 watermark: Bytes::from(pb.watermark.clone()),
399 }
400 }
401}
402
403impl From<VnodeWatermark> for PbVnodeWatermark {
404 fn from(watermark: VnodeWatermark) -> Self {
405 Self {
406 watermark: watermark.watermark.into(),
407 vnode_bitmap: Some(watermark.vnode_bitmap.to_protobuf()),
408 }
409 }
410}
411
412impl From<&VnodeWatermark> for PbVnodeWatermark {
413 fn from(watermark: &VnodeWatermark) -> Self {
414 Self {
415 watermark: watermark.watermark.to_vec(),
416 vnode_bitmap: Some(watermark.vnode_bitmap.to_protobuf()),
417 }
418 }
419}
420
421#[derive(Clone, Debug, PartialEq)]
422pub struct TableWatermarks {
423 pub watermarks: Vec<(HummockEpoch, Arc<[VnodeWatermark]>)>,
425 pub direction: WatermarkDirection,
426 pub watermark_type: WatermarkSerdeType,
427}
428
429impl TableWatermarks {
430 pub fn single_epoch(
431 epoch: HummockEpoch,
432 watermarks: Vec<VnodeWatermark>,
433 direction: WatermarkDirection,
434 watermark_type: WatermarkSerdeType,
435 ) -> Self {
436 let mut this = Self {
437 direction,
438 watermarks: Vec::new(),
439 watermark_type,
440 };
441 this.add_new_epoch_watermarks(epoch, watermarks.into(), direction, watermark_type);
442 this
443 }
444
445 pub fn add_new_epoch_watermarks(
446 &mut self,
447 epoch: HummockEpoch,
448 watermarks: Arc<[VnodeWatermark]>,
449 direction: WatermarkDirection,
450 watermark_type: WatermarkSerdeType,
451 ) {
452 assert_eq!(self.direction, direction);
453 assert_eq!(self.watermark_type, watermark_type);
454
455 if let Some((prev_epoch, _)) = self.watermarks.last() {
456 assert!(*prev_epoch < epoch);
457 }
458 if !watermarks.is_empty() {
459 let vnode_count = watermarks[0].vnode_count();
460 for watermark in &*watermarks {
461 assert_eq!(watermark.vnode_count(), vnode_count);
462 }
463 if let Some(existing_vnode_count) = self.vnode_count() {
464 assert_eq!(existing_vnode_count, vnode_count);
465 }
466 }
467 self.watermarks.push((epoch, watermarks));
468 }
469
470 fn vnode_count(&self) -> Option<usize> {
472 self.watermarks
473 .iter()
474 .flat_map(|(_, watermarks)| watermarks.as_ref())
475 .next()
476 .map(|w| w.vnode_count())
477 }
478}
479
480pub fn merge_multiple_new_table_watermarks(
481 table_watermarks_list: impl IntoIterator<Item = HashMap<TableId, TableWatermarks>>,
482) -> HashMap<TableId, TableWatermarks> {
483 #[allow(clippy::type_complexity)]
484 let mut ret: HashMap<
485 TableId,
486 (
487 WatermarkDirection,
488 BTreeMap<u64, Vec<VnodeWatermark>>,
489 WatermarkSerdeType,
490 ),
491 > = HashMap::new();
492 for table_watermarks in table_watermarks_list {
493 for (table_id, new_table_watermarks) in table_watermarks {
494 let epoch_watermarks = match ret.entry(table_id) {
495 Entry::Occupied(entry) => {
496 let (direction, epoch_watermarks, watermark_type) = entry.into_mut();
497 assert_eq!(&new_table_watermarks.direction, direction);
498 assert_eq!(&new_table_watermarks.watermark_type, watermark_type);
499 epoch_watermarks
500 }
501 Entry::Vacant(entry) => {
502 let (_, epoch_watermarks, _) = entry.insert((
503 new_table_watermarks.direction,
504 BTreeMap::new(),
505 new_table_watermarks.watermark_type,
506 ));
507 epoch_watermarks
508 }
509 };
510 for (new_epoch, new_epoch_watermarks) in new_table_watermarks.watermarks {
511 epoch_watermarks
512 .entry(new_epoch)
513 .or_insert_with(Vec::new)
514 .extend(new_epoch_watermarks.iter().cloned());
515 }
516 }
517 }
518 ret.into_iter()
519 .map(
520 |(table_id, (direction, epoch_watermarks, watermark_type))| {
521 (
522 table_id,
523 TableWatermarks {
524 direction,
525 watermarks: epoch_watermarks
527 .into_iter()
528 .map(|(epoch, watermarks)| (epoch, Arc::from(watermarks)))
529 .collect(),
530 watermark_type,
531 },
532 )
533 },
534 )
535 .collect()
536}
537
538impl TableWatermarks {
539 pub fn apply_new_table_watermarks(&mut self, newly_added_watermarks: &TableWatermarks) {
540 assert_eq!(self.direction, newly_added_watermarks.direction);
541 assert!(self.watermarks.iter().map(|(epoch, _)| epoch).is_sorted());
542 assert!(
543 newly_added_watermarks
544 .watermarks
545 .iter()
546 .map(|(epoch, _)| epoch)
547 .is_sorted()
548 );
549 if let Some((prev_last_epoch, _)) = self.watermarks.last()
551 && let Some((new_first_epoch, _)) = newly_added_watermarks.watermarks.first()
552 {
553 assert!(prev_last_epoch < new_first_epoch);
554 }
555 self.watermarks.extend(
556 newly_added_watermarks
557 .watermarks
558 .iter()
559 .map(|(epoch, new_watermarks)| (*epoch, new_watermarks.clone())),
560 );
561 }
562
563 pub fn clear_stale_epoch_watermark(&mut self, safe_epoch: u64) {
564 match self.watermarks.first() {
565 None => {
566 return;
568 }
569 Some((earliest_epoch, _)) => {
570 if *earliest_epoch >= safe_epoch {
571 return;
573 }
574 }
575 }
576 debug!("clear stale table watermark below epoch {}", safe_epoch);
577 let mut result_epoch_watermark = Vec::with_capacity(self.watermarks.len());
578 let mut set_vnode: HashSet<VirtualNode> = HashSet::new();
579 let mut vnode_count: Option<usize> = None; while let Some((epoch, _)) = self.watermarks.last() {
581 if *epoch >= safe_epoch {
582 let (epoch, watermarks) = self.watermarks.pop().expect("have check Some");
583 for watermark in watermarks.as_ref() {
584 vnode_count.get_or_insert_with(|| watermark.vnode_count());
585 for vnode in watermark.vnode_bitmap.iter_vnodes() {
586 set_vnode.insert(vnode);
587 }
588 }
589 result_epoch_watermark.push((epoch, watermarks));
590 } else {
591 break;
592 }
593 }
594 while vnode_count != Some(set_vnode.len())
595 && let Some((_, watermarks)) = self.watermarks.pop()
596 {
597 let mut new_vnode_watermarks = Vec::new();
598 for vnode_watermark in watermarks.as_ref() {
599 let mut new_set_vnode = Vec::new();
600 vnode_count.get_or_insert_with(|| vnode_watermark.vnode_count());
601 for vnode in vnode_watermark.vnode_bitmap.iter_vnodes() {
602 if set_vnode.insert(vnode) {
603 new_set_vnode.push(vnode);
604 }
605 }
606 if !new_set_vnode.is_empty() {
607 let mut builder = BitmapBuilder::zeroed(vnode_watermark.vnode_count());
608 for vnode in new_set_vnode {
609 builder.set(vnode.to_index(), true);
610 }
611 let bitmap = Arc::new(builder.finish());
612 new_vnode_watermarks.push(VnodeWatermark {
613 vnode_bitmap: bitmap,
614 watermark: vnode_watermark.watermark.clone(),
615 })
616 }
617 }
618 if !new_vnode_watermarks.is_empty() {
619 if let Some((last_epoch, last_watermarks)) = result_epoch_watermark.last_mut()
620 && *last_epoch == safe_epoch
621 {
622 *last_watermarks = Arc::from(
623 last_watermarks
624 .iter()
625 .cloned()
626 .chain(new_vnode_watermarks.into_iter())
627 .collect_vec(),
628 );
629 } else {
630 result_epoch_watermark.push((safe_epoch, Arc::from(new_vnode_watermarks)));
631 }
632 }
633 }
634 result_epoch_watermark.reverse();
637 assert!(
638 result_epoch_watermark
639 .is_sorted_by(|(first_epoch, _), (second_epoch, _)| { first_epoch < second_epoch })
640 );
641 *self = TableWatermarks {
642 watermarks: result_epoch_watermark,
643 direction: self.direction,
644 watermark_type: self.watermark_type,
645 }
646 }
647}
648
649impl TableWatermarks {
650 pub fn estimated_encode_len(&self) -> usize {
651 self.watermarks.len() * size_of::<HummockEpoch>()
652 + self
653 .watermarks
654 .iter()
655 .map(|(_, watermarks)| {
656 watermarks
657 .iter()
658 .map(|watermark| watermark.estimated_size())
659 .sum::<usize>()
660 })
661 .sum::<usize>()
662 + size_of::<bool>() }
664
665 pub fn to_protobuf(&self) -> PbTableWatermarks {
666 self.into()
667 }
668}
669
670impl From<&PbTableWatermarks> for WatermarkSerdeType {
671 fn from(pb: &PbTableWatermarks) -> Self {
672 match pb.raw_watermark_serde_type() {
673 PbWatermarkSerdeType::TypeUnspecified => {
674 #[expect(deprecated)]
676 if pb.is_non_pk_prefix {
677 WatermarkSerdeType::NonPkPrefix
678 } else {
679 WatermarkSerdeType::PkPrefix
680 }
681 }
682 PbWatermarkSerdeType::PkPrefix => WatermarkSerdeType::PkPrefix,
683 PbWatermarkSerdeType::NonPkPrefix => WatermarkSerdeType::NonPkPrefix,
684 PbWatermarkSerdeType::Value => WatermarkSerdeType::Value,
685 }
686 }
687}
688impl From<&PbTableWatermarks> for TableWatermarks {
689 fn from(pb: &PbTableWatermarks) -> Self {
690 let watermark_type = WatermarkSerdeType::from(pb);
691 Self {
692 watermarks: pb
693 .epoch_watermarks
694 .iter()
695 .map(|epoch_watermark| {
696 let epoch = epoch_watermark.epoch;
697 let watermarks = epoch_watermark
698 .watermarks
699 .iter()
700 .map(VnodeWatermark::from)
701 .collect();
702 (epoch, watermarks)
703 })
704 .collect(),
705 direction: if pb.is_ascending {
706 WatermarkDirection::Ascending
707 } else {
708 WatermarkDirection::Descending
709 },
710 watermark_type,
711 }
712 }
713}
714
715impl From<WatermarkSerdeType> for PbWatermarkSerdeType {
716 fn from(s: WatermarkSerdeType) -> Self {
717 match s {
718 WatermarkSerdeType::PkPrefix => PbWatermarkSerdeType::PkPrefix,
719 WatermarkSerdeType::NonPkPrefix => PbWatermarkSerdeType::NonPkPrefix,
720 WatermarkSerdeType::Value => PbWatermarkSerdeType::Value,
721 }
722 }
723}
724
725impl From<&TableWatermarks> for PbTableWatermarks {
726 fn from(table_watermarks: &TableWatermarks) -> Self {
727 let is_non_pk_prefix = match table_watermarks.watermark_type {
728 WatermarkSerdeType::PkPrefix => false,
729 WatermarkSerdeType::NonPkPrefix => true,
730 WatermarkSerdeType::Value => false,
731 };
732 Self {
733 epoch_watermarks: table_watermarks
734 .watermarks
735 .iter()
736 .map(|(epoch, watermarks)| PbEpochNewWatermarks {
737 watermarks: watermarks.iter().map(|wm| wm.into()).collect(),
738 epoch: *epoch,
739 })
740 .collect(),
741 is_ascending: match table_watermarks.direction {
742 WatermarkDirection::Ascending => true,
743 WatermarkDirection::Descending => false,
744 },
745 #[expect(deprecated)]
746 is_non_pk_prefix,
747 raw_watermark_serde_type: PbWatermarkSerdeType::from(table_watermarks.watermark_type)
748 as i32,
749 }
750 }
751}
752
753impl From<PbTableWatermarks> for TableWatermarks {
754 fn from(pb: PbTableWatermarks) -> Self {
755 let watermark_type = WatermarkSerdeType::from(&pb);
756 Self {
757 watermarks: pb
758 .epoch_watermarks
759 .into_iter()
760 .map(|epoch_watermark| {
761 let epoch = epoch_watermark.epoch;
762 let watermarks = epoch_watermark
763 .watermarks
764 .into_iter()
765 .map(VnodeWatermark::from)
766 .collect();
767 (epoch, watermarks)
768 })
769 .collect(),
770 direction: if pb.is_ascending {
771 WatermarkDirection::Ascending
772 } else {
773 WatermarkDirection::Descending
774 },
775 watermark_type,
776 }
777 }
778}
779
780impl From<TableWatermarks> for PbTableWatermarks {
781 fn from(table_watermarks: TableWatermarks) -> Self {
782 Self {
783 epoch_watermarks: table_watermarks
784 .watermarks
785 .into_iter()
786 .map(|(epoch, watermarks)| PbEpochNewWatermarks {
787 watermarks: watermarks.iter().map(PbVnodeWatermark::from).collect(),
788 epoch,
789 })
790 .collect(),
791 is_ascending: match table_watermarks.direction {
792 WatermarkDirection::Ascending => true,
793 WatermarkDirection::Descending => false,
794 },
795 #[expect(deprecated)]
796 is_non_pk_prefix: match table_watermarks.watermark_type {
797 WatermarkSerdeType::NonPkPrefix => true,
798 WatermarkSerdeType::PkPrefix => false,
799 WatermarkSerdeType::Value => false,
800 },
801 raw_watermark_serde_type: PbWatermarkSerdeType::from(table_watermarks.watermark_type)
802 as i32,
803 }
804 }
805}
806
807#[derive(Debug, Clone, Copy, PartialEq, Eq)]
808pub enum WatermarkSerdeType {
809 PkPrefix,
810 NonPkPrefix,
811 Value,
812}
813
814#[cfg(test)]
815mod tests {
816 use std::collections::Bound::Included;
817 use std::collections::{Bound, HashMap};
818 use std::ops::Bound::Excluded;
819 use std::sync::Arc;
820 use std::vec;
821
822 use bytes::Bytes;
823 use itertools::Itertools;
824 use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
825 use risingwave_common::catalog::TableId;
826 use risingwave_common::hash::VirtualNode;
827 use risingwave_common::util::epoch::{EpochExt, test_epoch};
828 use risingwave_pb::hummock::{PbHummockVersion, StateTableInfo};
829
830 use crate::compaction_group::StaticCompactionGroupId;
831 use crate::key::{TableKeyRange, is_empty_key_range, prefixed_range_with_vnode};
832 use crate::table_watermark::{
833 TableWatermarks, TableWatermarksIndex, VnodeWatermark, WatermarkDirection,
834 WatermarkSerdeType, merge_multiple_new_table_watermarks,
835 };
836 use crate::version::HummockVersion;
837
838 fn build_bitmap(vnodes: impl IntoIterator<Item = usize>) -> Arc<Bitmap> {
839 let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
840 for vnode in vnodes {
841 builder.set(vnode, true);
842 }
843 Arc::new(builder.finish())
844 }
845
846 #[test]
847 fn test_apply_new_table_watermark() {
848 let epoch1 = test_epoch(1);
849 let direction = WatermarkDirection::Ascending;
850 let watermark1 = Bytes::from("watermark1");
851 let watermark2 = Bytes::from("watermark2");
852 let watermark3 = Bytes::from("watermark3");
853 let watermark4 = Bytes::from("watermark4");
854 let watermark_type = WatermarkSerdeType::PkPrefix;
855 let mut table_watermarks = TableWatermarks::single_epoch(
856 epoch1,
857 vec![VnodeWatermark::new(build_bitmap(vec![0, 1, 2]), watermark1)],
858 direction,
859 watermark_type,
860 );
861 let epoch2 = epoch1.next_epoch();
862 table_watermarks.add_new_epoch_watermarks(
863 epoch2,
864 vec![VnodeWatermark::new(
865 build_bitmap(vec![0, 1, 2, 3]),
866 watermark2,
867 )]
868 .into(),
869 direction,
870 watermark_type,
871 );
872
873 let mut table_watermark_checkpoint = table_watermarks.clone();
874
875 let epoch3 = epoch2.next_epoch();
876 let mut second_table_watermark = TableWatermarks::single_epoch(
877 epoch3,
878 vec![VnodeWatermark::new(
879 build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
880 watermark3.clone(),
881 )],
882 direction,
883 watermark_type,
884 );
885 table_watermarks.add_new_epoch_watermarks(
886 epoch3,
887 vec![VnodeWatermark::new(
888 build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
889 watermark3,
890 )]
891 .into(),
892 direction,
893 watermark_type,
894 );
895 let epoch4 = epoch3.next_epoch();
896 let epoch5 = epoch4.next_epoch();
897 table_watermarks.add_new_epoch_watermarks(
898 epoch5,
899 vec![VnodeWatermark::new(
900 build_bitmap(vec![0, 3, 4]),
901 watermark4.clone(),
902 )]
903 .into(),
904 direction,
905 watermark_type,
906 );
907 second_table_watermark.add_new_epoch_watermarks(
908 epoch5,
909 vec![VnodeWatermark::new(build_bitmap(vec![0, 3, 4]), watermark4)].into(),
910 direction,
911 watermark_type,
912 );
913
914 table_watermark_checkpoint.apply_new_table_watermarks(&second_table_watermark);
915 assert_eq!(table_watermarks, table_watermark_checkpoint);
916 }
917
918 #[test]
919 fn test_clear_stale_epoch_watmermark() {
920 let epoch1 = test_epoch(1);
921 let direction = WatermarkDirection::Ascending;
922 let watermark1 = Bytes::from("watermark1");
923 let watermark2 = Bytes::from("watermark2");
924 let watermark3 = Bytes::from("watermark3");
925 let watermark4 = Bytes::from("watermark4");
926 let watermark_type = WatermarkSerdeType::PkPrefix;
927 let mut table_watermarks = TableWatermarks::single_epoch(
928 epoch1,
929 vec![VnodeWatermark::new(build_bitmap(vec![0, 1, 2]), watermark1)],
930 direction,
931 watermark_type,
932 );
933 let epoch2 = epoch1.next_epoch();
934 table_watermarks.add_new_epoch_watermarks(
935 epoch2,
936 vec![VnodeWatermark::new(
937 build_bitmap(vec![0, 1, 2, 3]),
938 watermark2.clone(),
939 )]
940 .into(),
941 direction,
942 watermark_type,
943 );
944 let epoch3 = epoch2.next_epoch();
945 table_watermarks.add_new_epoch_watermarks(
946 epoch3,
947 vec![VnodeWatermark::new(
948 build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
949 watermark3.clone(),
950 )]
951 .into(),
952 direction,
953 watermark_type,
954 );
955 let epoch4 = epoch3.next_epoch();
956 let epoch5 = epoch4.next_epoch();
957 table_watermarks.add_new_epoch_watermarks(
958 epoch5,
959 vec![VnodeWatermark::new(
960 build_bitmap(vec![0, 3, 4]),
961 watermark4.clone(),
962 )]
963 .into(),
964 direction,
965 watermark_type,
966 );
967
968 let mut table_watermarks_checkpoint = table_watermarks.clone();
969 table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch1);
970 assert_eq!(table_watermarks_checkpoint, table_watermarks);
971
972 table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch2);
973 assert_eq!(
974 table_watermarks_checkpoint,
975 TableWatermarks {
976 watermarks: vec![
977 (
978 epoch2,
979 vec![VnodeWatermark::new(
980 build_bitmap(vec![0, 1, 2, 3]),
981 watermark2,
982 )]
983 .into()
984 ),
985 (
986 epoch3,
987 vec![VnodeWatermark::new(
988 build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
989 watermark3.clone(),
990 )]
991 .into()
992 ),
993 (
994 epoch5,
995 vec![VnodeWatermark::new(
996 build_bitmap(vec![0, 3, 4]),
997 watermark4.clone(),
998 )]
999 .into()
1000 )
1001 ],
1002 direction,
1003 watermark_type,
1004 }
1005 );
1006
1007 table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch3);
1008 assert_eq!(
1009 table_watermarks_checkpoint,
1010 TableWatermarks {
1011 watermarks: vec![
1012 (
1013 epoch3,
1014 vec![VnodeWatermark::new(
1015 build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
1016 watermark3.clone(),
1017 )]
1018 .into()
1019 ),
1020 (
1021 epoch5,
1022 vec![VnodeWatermark::new(
1023 build_bitmap(vec![0, 3, 4]),
1024 watermark4.clone(),
1025 )]
1026 .into()
1027 )
1028 ],
1029 direction,
1030 watermark_type,
1031 }
1032 );
1033
1034 table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch4);
1035 assert_eq!(
1036 table_watermarks_checkpoint,
1037 TableWatermarks {
1038 watermarks: vec![
1039 (
1040 epoch4,
1041 vec![VnodeWatermark::new(
1042 build_bitmap((1..3).chain(5..VirtualNode::COUNT_FOR_TEST)),
1043 watermark3.clone()
1044 )]
1045 .into()
1046 ),
1047 (
1048 epoch5,
1049 vec![VnodeWatermark::new(
1050 build_bitmap(vec![0, 3, 4]),
1051 watermark4.clone(),
1052 )]
1053 .into()
1054 )
1055 ],
1056 direction,
1057 watermark_type,
1058 }
1059 );
1060
1061 table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch5);
1062 assert_eq!(
1063 table_watermarks_checkpoint,
1064 TableWatermarks {
1065 watermarks: vec![(
1066 epoch5,
1067 vec![
1068 VnodeWatermark::new(build_bitmap(vec![0, 3, 4]), watermark4),
1069 VnodeWatermark::new(
1070 build_bitmap((1..3).chain(5..VirtualNode::COUNT_FOR_TEST)),
1071 watermark3
1072 )
1073 ]
1074 .into()
1075 )],
1076 direction,
1077 watermark_type,
1078 }
1079 );
1080 }
1081
1082 #[test]
1083 fn test_merge_multiple_new_table_watermarks() {
1084 fn epoch_new_watermark(epoch: u64, bitmaps: Vec<&Bitmap>) -> (u64, Arc<[VnodeWatermark]>) {
1085 (
1086 epoch,
1087 bitmaps
1088 .into_iter()
1089 .map(|bitmap| VnodeWatermark {
1090 watermark: Bytes::from(vec![1, 2, epoch as _]),
1091 vnode_bitmap: Arc::new(bitmap.clone()),
1092 })
1093 .collect_vec()
1094 .into(),
1095 )
1096 }
1097 fn build_table_watermark(
1098 vnodes: impl IntoIterator<Item = usize>,
1099 epochs: impl IntoIterator<Item = u64>,
1100 ) -> TableWatermarks {
1101 let bitmap = build_bitmap(vnodes);
1102 TableWatermarks {
1103 watermarks: epochs
1104 .into_iter()
1105 .map(|epoch: u64| epoch_new_watermark(epoch, vec![&bitmap]))
1106 .collect(),
1107 direction: WatermarkDirection::Ascending,
1108 watermark_type: WatermarkSerdeType::PkPrefix,
1109 }
1110 }
1111 let table1_watermark1 = build_table_watermark(0..3, vec![1, 2, 4]);
1112 let table1_watermark2 = build_table_watermark(4..6, vec![1, 2, 5]);
1113 let table2_watermark = build_table_watermark(0..4, 1..3);
1114 let table3_watermark = build_table_watermark(0..4, 3..5);
1115 let mut first = HashMap::new();
1116 first.insert(TableId::new(1), table1_watermark1);
1117 first.insert(TableId::new(2), table2_watermark.clone());
1118 let mut second = HashMap::new();
1119 second.insert(TableId::new(1), table1_watermark2);
1120 second.insert(TableId::new(3), table3_watermark.clone());
1121 let result = merge_multiple_new_table_watermarks(vec![first, second]);
1122 let mut expected = HashMap::new();
1123 expected.insert(
1124 TableId::new(1),
1125 TableWatermarks {
1126 watermarks: vec![
1127 epoch_new_watermark(1, vec![&build_bitmap(0..3), &build_bitmap(4..6)]),
1128 epoch_new_watermark(2, vec![&build_bitmap(0..3), &build_bitmap(4..6)]),
1129 epoch_new_watermark(4, vec![&build_bitmap(0..3)]),
1130 epoch_new_watermark(5, vec![&build_bitmap(4..6)]),
1131 ],
1132 direction: WatermarkDirection::Ascending,
1133 watermark_type: WatermarkSerdeType::PkPrefix,
1134 },
1135 );
1136 expected.insert(TableId::new(2), table2_watermark);
1137 expected.insert(TableId::new(3), table3_watermark);
1138 assert_eq!(result, expected);
1139 }
1140
1141 const COMMITTED_EPOCH: u64 = test_epoch(1);
1142 const EPOCH1: u64 = test_epoch(2);
1143 const EPOCH2: u64 = test_epoch(3);
1144 const TEST_SINGLE_VNODE: VirtualNode = VirtualNode::from_index(1);
1145
1146 fn build_watermark_range(
1147 direction: WatermarkDirection,
1148 (low, high): (Bound<Bytes>, Bound<Bytes>),
1149 ) -> TableKeyRange {
1150 let range = match direction {
1151 WatermarkDirection::Ascending => (low, high),
1152 WatermarkDirection::Descending => (high, low),
1153 };
1154 prefixed_range_with_vnode(range, TEST_SINGLE_VNODE)
1155 }
1156
1157 fn build_and_test_watermark_index(
1161 direction: WatermarkDirection,
1162 watermark1: Bytes,
1163 watermark2: Bytes,
1164 watermark3: Bytes,
1165 ) -> TableWatermarksIndex {
1166 let mut index = TableWatermarksIndex::new(
1167 direction,
1168 EPOCH1,
1169 vec![VnodeWatermark::new(build_bitmap(0..4), watermark1.clone())],
1170 Some(COMMITTED_EPOCH),
1171 WatermarkSerdeType::PkPrefix,
1172 );
1173 index.add_epoch_watermark(
1174 EPOCH2,
1175 vec![VnodeWatermark::new(build_bitmap(1..5), watermark2.clone())].into(),
1176 direction,
1177 );
1178
1179 assert_eq!(
1180 index.read_watermark(VirtualNode::from_index(0), EPOCH1),
1181 Some(watermark1.clone())
1182 );
1183 assert_eq!(
1184 index.read_watermark(VirtualNode::from_index(1), EPOCH1),
1185 Some(watermark1.clone())
1186 );
1187 assert_eq!(
1188 index.read_watermark(VirtualNode::from_index(4), EPOCH1),
1189 None
1190 );
1191 assert_eq!(
1192 index.read_watermark(VirtualNode::from_index(0), EPOCH2),
1193 Some(watermark1.clone())
1194 );
1195 assert_eq!(
1196 index.read_watermark(VirtualNode::from_index(1), EPOCH2),
1197 Some(watermark2.clone())
1198 );
1199 assert_eq!(
1200 index.read_watermark(VirtualNode::from_index(4), EPOCH2),
1201 Some(watermark2.clone())
1202 );
1203 assert_eq!(
1204 index.latest_watermark(VirtualNode::from_index(0)),
1205 Some(watermark1.clone())
1206 );
1207 assert_eq!(
1208 index.latest_watermark(VirtualNode::from_index(1)),
1209 Some(watermark2.clone())
1210 );
1211 assert_eq!(
1212 index.latest_watermark(VirtualNode::from_index(4)),
1213 Some(watermark2.clone())
1214 );
1215
1216 let check_watermark_range =
1218 |query_range: (Bound<Bytes>, Bound<Bytes>),
1219 output_range: Option<(Bound<Bytes>, Bound<Bytes>)>| {
1220 let mut range = build_watermark_range(direction, query_range);
1221 index.rewrite_range_with_table_watermark(EPOCH2, &mut range);
1222 if let Some(output_range) = output_range {
1223 assert_eq!(range, build_watermark_range(direction, output_range));
1224 } else {
1225 assert!(is_empty_key_range(&range));
1226 }
1227 };
1228
1229 check_watermark_range(
1231 (Included(watermark1.clone()), Excluded(watermark3.clone())),
1232 Some((Included(watermark2.clone()), Excluded(watermark3.clone()))),
1233 );
1234
1235 check_watermark_range(
1237 (Included(watermark2.clone()), Excluded(watermark3.clone())),
1238 Some((Included(watermark2.clone()), Excluded(watermark3.clone()))),
1239 );
1240 check_watermark_range(
1241 (Excluded(watermark2.clone()), Excluded(watermark3.clone())),
1242 Some((Excluded(watermark2.clone()), Excluded(watermark3))),
1243 );
1244
1245 check_watermark_range(
1247 (Excluded(watermark1.clone()), Excluded(watermark2.clone())),
1248 None,
1249 );
1250 check_watermark_range(
1251 (Excluded(watermark1), Included(watermark2.clone())),
1252 Some((Included(watermark2.clone()), Included(watermark2))),
1253 );
1254
1255 index
1256 }
1257
1258 #[test]
1259 fn test_watermark_index_ascending() {
1260 let watermark1 = Bytes::from_static(b"watermark1");
1261 let watermark2 = Bytes::from_static(b"watermark2");
1262 let watermark3 = Bytes::from_static(b"watermark3");
1263 build_and_test_watermark_index(
1264 WatermarkDirection::Ascending,
1265 watermark1,
1266 watermark2,
1267 watermark3,
1268 );
1269 }
1270
1271 #[test]
1272 fn test_watermark_index_descending() {
1273 let watermark1 = Bytes::from_static(b"watermark254");
1274 let watermark2 = Bytes::from_static(b"watermark253");
1275 let watermark3 = Bytes::from_static(b"watermark252");
1276 build_and_test_watermark_index(
1277 WatermarkDirection::Descending,
1278 watermark1,
1279 watermark2,
1280 watermark3,
1281 );
1282 }
1283
1284 #[test]
1285 fn test_apply_committed_index() {
1286 let watermark1 = Bytes::from_static(b"watermark1");
1287 let watermark2 = Bytes::from_static(b"watermark2");
1288 let watermark3 = Bytes::from_static(b"watermark3");
1289 let mut index = build_and_test_watermark_index(
1290 WatermarkDirection::Ascending,
1291 watermark1.clone(),
1292 watermark2.clone(),
1293 watermark3,
1294 );
1295
1296 let test_table_id = TableId::from(233);
1297
1298 let mut version = HummockVersion::from_rpc_protobuf(&PbHummockVersion {
1299 state_table_info: HashMap::from_iter([(
1300 test_table_id,
1301 StateTableInfo {
1302 committed_epoch: EPOCH1,
1303 compaction_group_id: StaticCompactionGroupId::StateDefault,
1304 },
1305 )]),
1306 ..Default::default()
1307 });
1308 version.table_watermarks.insert(
1309 test_table_id,
1310 TableWatermarks {
1311 watermarks: vec![(
1312 EPOCH1,
1313 vec![VnodeWatermark {
1314 watermark: watermark1.clone(),
1315 vnode_bitmap: build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
1316 }]
1317 .into(),
1318 )],
1319 direction: WatermarkDirection::Ascending,
1320 watermark_type: WatermarkSerdeType::PkPrefix,
1321 }
1322 .into(),
1323 );
1324 index.apply_committed_watermarks(
1325 version
1326 .table_watermarks
1327 .get(&test_table_id)
1328 .unwrap()
1329 .clone(),
1330 EPOCH1,
1331 );
1332 assert_eq!(EPOCH1, index.committed_epoch.unwrap());
1333 assert_eq!(EPOCH2, index.latest_epoch);
1334 for vnode in 0..VirtualNode::COUNT_FOR_TEST {
1335 let vnode = VirtualNode::from_index(vnode);
1336 if (1..5).contains(&vnode.to_index()) {
1337 assert_eq!(watermark1, index.read_watermark(vnode, EPOCH1).unwrap());
1338 assert_eq!(watermark2, index.read_watermark(vnode, EPOCH2).unwrap());
1339 } else {
1340 assert_eq!(watermark1, index.read_watermark(vnode, EPOCH1).unwrap());
1341 }
1342 }
1343 }
1344
1345 #[test]
1346 fn test_filter_regress_watermark() {
1347 let watermark1 = Bytes::from_static(b"watermark1");
1348 let watermark2 = Bytes::from_static(b"watermark2");
1349 let watermark3 = Bytes::from_static(b"watermark3");
1350 let index = build_and_test_watermark_index(
1351 WatermarkDirection::Ascending,
1352 watermark1.clone(),
1353 watermark2,
1354 watermark3.clone(),
1355 );
1356
1357 let mut new_watermarks = vec![
1358 VnodeWatermark {
1360 vnode_bitmap: build_bitmap(0..2),
1361 watermark: watermark1.clone(),
1362 },
1363 VnodeWatermark {
1365 vnode_bitmap: build_bitmap(2..4),
1366 watermark: watermark3.clone(),
1367 },
1368 VnodeWatermark {
1370 vnode_bitmap: build_bitmap(4..5),
1371 watermark: watermark1.clone(),
1372 },
1373 VnodeWatermark {
1375 vnode_bitmap: build_bitmap(5..6),
1376 watermark: watermark3.clone(),
1377 },
1378 ];
1379
1380 index.filter_regress_watermarks(&mut new_watermarks);
1381
1382 assert_eq!(
1383 new_watermarks,
1384 vec![
1385 VnodeWatermark {
1386 vnode_bitmap: build_bitmap(0..1),
1387 watermark: watermark1,
1388 },
1389 VnodeWatermark {
1390 vnode_bitmap: build_bitmap(2..4),
1391 watermark: watermark3.clone(),
1392 },
1393 VnodeWatermark {
1394 vnode_bitmap: build_bitmap(5..6),
1395 watermark: watermark3,
1396 },
1397 ]
1398 );
1399 }
1400}