1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
17use std::fmt::Display;
18use std::mem::size_of;
19use std::ops::Bound::{Excluded, Included, Unbounded};
20use std::sync::Arc;
21
22use bytes::Bytes;
23use itertools::Itertools;
24use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
25use risingwave_common::catalog::TableId;
26use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
27use risingwave_common::types::ToDatumRef;
28use risingwave_common::util::sort_util::{OrderType, cmp_datum};
29use risingwave_common_estimate_size::EstimateSize;
30use risingwave_pb::hummock::table_watermarks::PbEpochNewWatermarks;
31use risingwave_pb::hummock::{PbVnodeWatermark, TableWatermarks as PbTableWatermarks};
32use tracing::{debug, warn};
33
34use crate::HummockEpoch;
35use crate::key::{TableKey, TableKeyRange, prefix_slice_with_vnode, vnode};
36
37#[derive(Clone)]
38pub struct ReadTableWatermark {
39 pub direction: WatermarkDirection,
40 pub vnode_watermarks: BTreeMap<VirtualNode, Bytes>,
41}
42
43#[derive(Clone)]
44pub struct PkPrefixTableWatermarksIndex {
45 pub watermark_direction: WatermarkDirection,
46 pub staging_watermarks: VecDeque<(HummockEpoch, Arc<[VnodeWatermark]>)>,
48 pub committed_watermarks: Option<Arc<TableWatermarks>>,
49 latest_epoch: HummockEpoch,
50 committed_epoch: Option<HummockEpoch>,
51}
52
53impl PkPrefixTableWatermarksIndex {
54 pub fn new(
55 watermark_direction: WatermarkDirection,
56 first_epoch: HummockEpoch,
57 first_vnode_watermark: Vec<VnodeWatermark>,
58 committed_epoch: Option<HummockEpoch>,
59 ) -> Self {
60 if let Some(committed_epoch) = committed_epoch {
61 assert!(first_epoch > committed_epoch);
62 }
63 Self {
64 watermark_direction,
65 staging_watermarks: VecDeque::from_iter([(
66 first_epoch,
67 Arc::from(first_vnode_watermark),
68 )]),
69 committed_watermarks: None,
70 latest_epoch: first_epoch,
71 committed_epoch,
72 }
73 }
74
75 pub fn new_committed(
76 committed_watermarks: Arc<TableWatermarks>,
77 committed_epoch: HummockEpoch,
78 ) -> Self {
79 assert_eq!(
80 committed_watermarks.watermark_type,
81 WatermarkSerdeType::PkPrefix
82 );
83 Self {
84 watermark_direction: committed_watermarks.direction,
85 staging_watermarks: VecDeque::new(),
86 committed_epoch: Some(committed_epoch),
87 latest_epoch: committed_epoch,
88 committed_watermarks: Some(committed_watermarks),
89 }
90 }
91
92 pub fn read_watermark(&self, vnode: VirtualNode, epoch: HummockEpoch) -> Option<Bytes> {
93 for (watermark_epoch, vnode_watermark_list) in self.staging_watermarks.iter().rev().chain(
95 self.committed_watermarks
96 .iter()
97 .flat_map(|watermarks| watermarks.watermarks.iter().rev()),
98 ) {
99 if *watermark_epoch > epoch {
100 continue;
101 }
102 for vnode_watermark in vnode_watermark_list.as_ref() {
103 if vnode_watermark.vnode_bitmap.is_set(vnode.to_index()) {
104 return Some(vnode_watermark.watermark.clone());
105 }
106 }
107 }
108 None
109 }
110
111 pub fn latest_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
112 self.read_watermark(vnode, HummockEpoch::MAX)
113 }
114
115 pub fn rewrite_range_with_table_watermark(
116 &self,
117 epoch: HummockEpoch,
118 key_range: &mut TableKeyRange,
119 ) {
120 let vnode = vnode(key_range);
121 if let Some(watermark) = self.read_watermark(vnode, epoch) {
122 match self.watermark_direction {
123 WatermarkDirection::Ascending => {
124 let overwrite_start_key = match &key_range.0 {
125 Included(start_key) | Excluded(start_key) => {
126 start_key.key_part() < watermark
127 }
128 Unbounded => true,
129 };
130 if overwrite_start_key {
131 let watermark_key = TableKey(prefix_slice_with_vnode(vnode, &watermark));
132 let fully_filtered = match &key_range.1 {
133 Included(end_key) => end_key < &watermark_key,
134 Excluded(end_key) => end_key <= &watermark_key,
135 Unbounded => false,
136 };
137 if fully_filtered {
138 key_range.1 = Excluded(watermark_key.clone());
139 }
140 key_range.0 = Included(watermark_key);
141 }
142 }
143 WatermarkDirection::Descending => {
144 let overwrite_end_key = match &key_range.1 {
145 Included(end_key) | Excluded(end_key) => end_key.key_part() > watermark,
146 Unbounded => true,
147 };
148 if overwrite_end_key {
149 let watermark_key = TableKey(prefix_slice_with_vnode(vnode, &watermark));
150 let fully_filtered = match &key_range.0 {
151 Included(start_key) => start_key > &watermark_key,
152 Excluded(start_key) => start_key >= &watermark_key,
153 Unbounded => false,
154 };
155 if fully_filtered {
156 *key_range = (Included(watermark_key.clone()), Excluded(watermark_key));
157 } else {
158 key_range.1 = Included(watermark_key);
159 }
160 }
161 }
162 }
163 }
164 }
165
166 pub fn filter_regress_watermarks(&self, watermarks: &mut Vec<VnodeWatermark>) {
167 let mut ret = Vec::with_capacity(watermarks.len());
168 for watermark in watermarks.drain(..) {
169 let vnode_count = watermark.vnode_count();
170
171 let mut regress_vnodes = None;
172 for vnode in watermark.vnode_bitmap.iter_vnodes() {
173 if let Some(prev_watermark) = self.latest_watermark(vnode) {
174 let is_regress = match self.direction() {
175 WatermarkDirection::Ascending => prev_watermark > watermark.watermark,
176 WatermarkDirection::Descending => prev_watermark < watermark.watermark,
177 };
178 if is_regress {
179 warn!(
180 "table watermark regress: {:?} {} {:?} {:?}",
181 self.direction(),
182 vnode,
183 watermark.watermark,
184 prev_watermark
185 );
186 regress_vnodes
187 .get_or_insert_with(|| BitmapBuilder::zeroed(vnode_count))
188 .set(vnode.to_index(), true);
189 }
190 }
191 }
192 if let Some(regress_vnodes) = regress_vnodes {
193 let mut bitmap_builder = None;
194 for vnode in watermark.vnode_bitmap.iter_vnodes() {
195 let vnode_index = vnode.to_index();
196 if !regress_vnodes.is_set(vnode_index) {
197 bitmap_builder
198 .get_or_insert_with(|| BitmapBuilder::zeroed(vnode_count))
199 .set(vnode_index, true);
200 }
201 }
202 if let Some(bitmap_builder) = bitmap_builder {
203 ret.push(VnodeWatermark::new(
204 Arc::new(bitmap_builder.finish()),
205 watermark.watermark,
206 ));
207 }
208 } else {
209 ret.push(watermark);
211 }
212 }
213 *watermarks = ret;
214 }
215
216 pub fn direction(&self) -> WatermarkDirection {
217 self.watermark_direction
218 }
219
220 pub fn add_epoch_watermark(
221 &mut self,
222 epoch: HummockEpoch,
223 vnode_watermark_list: Arc<[VnodeWatermark]>,
224 direction: WatermarkDirection,
225 ) {
226 assert!(epoch > self.latest_epoch);
227 assert_eq!(self.watermark_direction, direction);
228 self.latest_epoch = epoch;
229 #[cfg(debug_assertions)]
230 if !vnode_watermark_list.is_empty() {
231 let vnode_count = vnode_watermark_list[0].vnode_count();
232 let mut vnode_is_set = BitmapBuilder::zeroed(vnode_count);
233 for vnode_watermark in vnode_watermark_list.as_ref() {
234 for vnode in vnode_watermark.vnode_bitmap.iter_ones() {
235 assert!(!vnode_is_set.is_set(vnode));
236 vnode_is_set.set(vnode, true);
237 let vnode = VirtualNode::from_index(vnode);
238 if let Some(prev_watermark) = self.latest_watermark(vnode) {
239 match self.watermark_direction {
240 WatermarkDirection::Ascending => {
241 assert!(vnode_watermark.watermark >= prev_watermark);
242 }
243 WatermarkDirection::Descending => {
244 assert!(vnode_watermark.watermark <= prev_watermark);
245 }
246 };
247 }
248 }
249 }
250 }
251 self.staging_watermarks
252 .push_back((epoch, vnode_watermark_list));
253 }
254
255 pub fn apply_committed_watermarks(
256 &mut self,
257 committed_watermark: Arc<TableWatermarks>,
258 committed_epoch: HummockEpoch,
259 ) {
260 assert_eq!(
261 committed_watermark.watermark_type,
262 WatermarkSerdeType::PkPrefix
263 );
264 assert_eq!(self.watermark_direction, committed_watermark.direction);
265 if let Some(prev_committed_epoch) = self.committed_epoch {
266 assert!(prev_committed_epoch <= committed_epoch);
267 if prev_committed_epoch == committed_epoch {
268 return;
269 }
270 }
271 if self.latest_epoch < committed_epoch {
272 debug!(
273 latest_epoch = self.latest_epoch,
274 committed_epoch, "committed_epoch exceed table watermark latest_epoch"
275 );
276 self.latest_epoch = committed_epoch;
277 }
278 self.committed_epoch = Some(committed_epoch);
279 self.committed_watermarks = Some(committed_watermark);
280 while let Some((old_epoch, _)) = self.staging_watermarks.front()
282 && *old_epoch <= committed_epoch
283 {
284 let _ = self.staging_watermarks.pop_front();
285 }
286 }
287}
288
289#[derive(Debug, Clone, Copy, Eq, PartialEq)]
290pub enum WatermarkDirection {
291 Ascending,
292 Descending,
293}
294
295impl Display for WatermarkDirection {
296 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
297 match self {
298 WatermarkDirection::Ascending => write!(f, "Ascending"),
299 WatermarkDirection::Descending => write!(f, "Descending"),
300 }
301 }
302}
303
304impl WatermarkDirection {
305 pub fn key_filter_by_watermark(
306 &self,
307 key: impl AsRef<[u8]>,
308 watermark: impl AsRef<[u8]>,
309 ) -> bool {
310 let key = key.as_ref();
311 let watermark = watermark.as_ref();
312 match self {
313 WatermarkDirection::Ascending => key < watermark,
314 WatermarkDirection::Descending => key > watermark,
315 }
316 }
317
318 pub fn datum_filter_by_watermark(
319 &self,
320 watermark_col_in_pk: impl ToDatumRef,
321 watermark: impl ToDatumRef,
322 order_type: OrderType,
323 ) -> bool {
324 let watermark_col_in_pk = watermark_col_in_pk.to_datum_ref();
325 let watermark = watermark.to_datum_ref();
326 match self {
327 WatermarkDirection::Ascending => {
328 cmp_datum(watermark_col_in_pk, watermark, order_type).is_lt()
330 }
331 WatermarkDirection::Descending => {
332 cmp_datum(watermark_col_in_pk, watermark, order_type).is_gt()
334 }
335 }
336 }
337
338 pub fn is_ascending(&self) -> bool {
339 match self {
340 WatermarkDirection::Ascending => true,
341 WatermarkDirection::Descending => false,
342 }
343 }
344}
345
346#[derive(Clone, Debug, PartialEq, EstimateSize)]
347pub struct VnodeWatermark {
348 vnode_bitmap: Arc<Bitmap>,
349 watermark: Bytes,
350}
351
352impl VnodeWatermark {
353 pub fn new(vnode_bitmap: Arc<Bitmap>, watermark: Bytes) -> Self {
354 Self {
355 vnode_bitmap,
356 watermark,
357 }
358 }
359
360 pub fn vnode_bitmap(&self) -> &Bitmap {
361 &self.vnode_bitmap
362 }
363
364 pub fn vnode_count(&self) -> usize {
366 self.vnode_bitmap.len()
367 }
368
369 pub fn watermark(&self) -> &Bytes {
370 &self.watermark
371 }
372
373 pub fn to_protobuf(&self) -> PbVnodeWatermark {
374 self.into()
375 }
376}
377
378impl From<PbVnodeWatermark> for VnodeWatermark {
379 fn from(pb: PbVnodeWatermark) -> Self {
380 Self {
381 vnode_bitmap: Arc::new(Bitmap::from(pb.vnode_bitmap.as_ref().unwrap())),
382 watermark: Bytes::from(pb.watermark),
383 }
384 }
385}
386
387impl From<&PbVnodeWatermark> for VnodeWatermark {
388 fn from(pb: &PbVnodeWatermark) -> Self {
389 Self {
390 vnode_bitmap: Arc::new(Bitmap::from(pb.vnode_bitmap.as_ref().unwrap())),
391 watermark: Bytes::from(pb.watermark.clone()),
392 }
393 }
394}
395
396impl From<VnodeWatermark> for PbVnodeWatermark {
397 fn from(watermark: VnodeWatermark) -> Self {
398 Self {
399 watermark: watermark.watermark.into(),
400 vnode_bitmap: Some(watermark.vnode_bitmap.to_protobuf()),
401 }
402 }
403}
404
405impl From<&VnodeWatermark> for PbVnodeWatermark {
406 fn from(watermark: &VnodeWatermark) -> Self {
407 Self {
408 watermark: watermark.watermark.to_vec(),
409 vnode_bitmap: Some(watermark.vnode_bitmap.to_protobuf()),
410 }
411 }
412}
413
414#[derive(Clone, Debug, PartialEq)]
415pub struct TableWatermarks {
416 pub watermarks: Vec<(HummockEpoch, Arc<[VnodeWatermark]>)>,
418 pub direction: WatermarkDirection,
419 pub watermark_type: WatermarkSerdeType,
420}
421
422impl TableWatermarks {
423 pub fn single_epoch(
424 epoch: HummockEpoch,
425 watermarks: Vec<VnodeWatermark>,
426 direction: WatermarkDirection,
427 watermark_type: WatermarkSerdeType,
428 ) -> Self {
429 let mut this = Self {
430 direction,
431 watermarks: Vec::new(),
432 watermark_type,
433 };
434 this.add_new_epoch_watermarks(epoch, watermarks.into(), direction, watermark_type);
435 this
436 }
437
438 pub fn add_new_epoch_watermarks(
439 &mut self,
440 epoch: HummockEpoch,
441 watermarks: Arc<[VnodeWatermark]>,
442 direction: WatermarkDirection,
443 watermark_type: WatermarkSerdeType,
444 ) {
445 assert_eq!(self.direction, direction);
446 assert_eq!(self.watermark_type, watermark_type);
447
448 if let Some((prev_epoch, _)) = self.watermarks.last() {
449 assert!(*prev_epoch < epoch);
450 }
451 if !watermarks.is_empty() {
452 let vnode_count = watermarks[0].vnode_count();
453 for watermark in &*watermarks {
454 assert_eq!(watermark.vnode_count(), vnode_count);
455 }
456 if let Some(existing_vnode_count) = self.vnode_count() {
457 assert_eq!(existing_vnode_count, vnode_count);
458 }
459 }
460 self.watermarks.push((epoch, watermarks));
461 }
462
463 fn vnode_count(&self) -> Option<usize> {
465 self.watermarks
466 .iter()
467 .flat_map(|(_, watermarks)| watermarks.as_ref())
468 .next()
469 .map(|w| w.vnode_count())
470 }
471
472 pub fn from_protobuf(pb: &PbTableWatermarks) -> Self {
473 Self {
474 watermarks: pb
475 .epoch_watermarks
476 .iter()
477 .map(|epoch_watermark| {
478 let epoch = epoch_watermark.epoch;
479 let watermarks = epoch_watermark
480 .watermarks
481 .iter()
482 .map(VnodeWatermark::from)
483 .collect_vec();
484 (epoch, Arc::from(watermarks))
485 })
486 .collect(),
487 direction: if pb.is_ascending {
488 WatermarkDirection::Ascending
489 } else {
490 WatermarkDirection::Descending
491 },
492 watermark_type: if pb.is_non_pk_prefix {
493 WatermarkSerdeType::NonPkPrefix
494 } else {
495 WatermarkSerdeType::PkPrefix
496 },
497 }
498 }
499}
500
501pub fn merge_multiple_new_table_watermarks(
502 table_watermarks_list: impl IntoIterator<Item = HashMap<TableId, TableWatermarks>>,
503) -> HashMap<TableId, TableWatermarks> {
504 #[allow(clippy::type_complexity)]
505 let mut ret: HashMap<
506 TableId,
507 (
508 WatermarkDirection,
509 BTreeMap<u64, Vec<VnodeWatermark>>,
510 WatermarkSerdeType,
511 ),
512 > = HashMap::new();
513 for table_watermarks in table_watermarks_list {
514 for (table_id, new_table_watermarks) in table_watermarks {
515 let epoch_watermarks = match ret.entry(table_id) {
516 Entry::Occupied(entry) => {
517 let (direction, epoch_watermarks, watermark_type) = entry.into_mut();
518 assert_eq!(&new_table_watermarks.direction, direction);
519 assert_eq!(&new_table_watermarks.watermark_type, watermark_type);
520 epoch_watermarks
521 }
522 Entry::Vacant(entry) => {
523 let (_, epoch_watermarks, _) = entry.insert((
524 new_table_watermarks.direction,
525 BTreeMap::new(),
526 new_table_watermarks.watermark_type,
527 ));
528 epoch_watermarks
529 }
530 };
531 for (new_epoch, new_epoch_watermarks) in new_table_watermarks.watermarks {
532 epoch_watermarks
533 .entry(new_epoch)
534 .or_insert_with(Vec::new)
535 .extend(new_epoch_watermarks.iter().cloned());
536 }
537 }
538 }
539 ret.into_iter()
540 .map(
541 |(table_id, (direction, epoch_watermarks, watermark_type))| {
542 (
543 table_id,
544 TableWatermarks {
545 direction,
546 watermarks: epoch_watermarks
548 .into_iter()
549 .map(|(epoch, watermarks)| (epoch, Arc::from(watermarks)))
550 .collect(),
551 watermark_type,
552 },
553 )
554 },
555 )
556 .collect()
557}
558
559impl TableWatermarks {
560 pub fn apply_new_table_watermarks(&mut self, newly_added_watermarks: &TableWatermarks) {
561 assert_eq!(self.direction, newly_added_watermarks.direction);
562 assert!(self.watermarks.iter().map(|(epoch, _)| epoch).is_sorted());
563 assert!(
564 newly_added_watermarks
565 .watermarks
566 .iter()
567 .map(|(epoch, _)| epoch)
568 .is_sorted()
569 );
570 if let Some((prev_last_epoch, _)) = self.watermarks.last()
572 && let Some((new_first_epoch, _)) = newly_added_watermarks.watermarks.first()
573 {
574 assert!(prev_last_epoch < new_first_epoch);
575 }
576 self.watermarks.extend(
577 newly_added_watermarks
578 .watermarks
579 .iter()
580 .map(|(epoch, new_watermarks)| (*epoch, new_watermarks.clone())),
581 );
582 }
583
584 pub fn clear_stale_epoch_watermark(&mut self, safe_epoch: u64) {
585 match self.watermarks.first() {
586 None => {
587 return;
589 }
590 Some((earliest_epoch, _)) => {
591 if *earliest_epoch >= safe_epoch {
592 return;
594 }
595 }
596 }
597 debug!("clear stale table watermark below epoch {}", safe_epoch);
598 let mut result_epoch_watermark = Vec::with_capacity(self.watermarks.len());
599 let mut set_vnode: HashSet<VirtualNode> = HashSet::new();
600 let mut vnode_count: Option<usize> = None; while let Some((epoch, _)) = self.watermarks.last() {
602 if *epoch >= safe_epoch {
603 let (epoch, watermarks) = self.watermarks.pop().expect("have check Some");
604 for watermark in watermarks.as_ref() {
605 vnode_count.get_or_insert_with(|| watermark.vnode_count());
606 for vnode in watermark.vnode_bitmap.iter_vnodes() {
607 set_vnode.insert(vnode);
608 }
609 }
610 result_epoch_watermark.push((epoch, watermarks));
611 } else {
612 break;
613 }
614 }
615 while vnode_count != Some(set_vnode.len())
616 && let Some((_, watermarks)) = self.watermarks.pop()
617 {
618 let mut new_vnode_watermarks = Vec::new();
619 for vnode_watermark in watermarks.as_ref() {
620 let mut new_set_vnode = Vec::new();
621 vnode_count.get_or_insert_with(|| vnode_watermark.vnode_count());
622 for vnode in vnode_watermark.vnode_bitmap.iter_vnodes() {
623 if set_vnode.insert(vnode) {
624 new_set_vnode.push(vnode);
625 }
626 }
627 if !new_set_vnode.is_empty() {
628 let mut builder = BitmapBuilder::zeroed(vnode_watermark.vnode_count());
629 for vnode in new_set_vnode {
630 builder.set(vnode.to_index(), true);
631 }
632 let bitmap = Arc::new(builder.finish());
633 new_vnode_watermarks.push(VnodeWatermark {
634 vnode_bitmap: bitmap,
635 watermark: vnode_watermark.watermark.clone(),
636 })
637 }
638 }
639 if !new_vnode_watermarks.is_empty() {
640 if let Some((last_epoch, last_watermarks)) = result_epoch_watermark.last_mut()
641 && *last_epoch == safe_epoch
642 {
643 *last_watermarks = Arc::from(
644 last_watermarks
645 .iter()
646 .cloned()
647 .chain(new_vnode_watermarks.into_iter())
648 .collect_vec(),
649 );
650 } else {
651 result_epoch_watermark.push((safe_epoch, Arc::from(new_vnode_watermarks)));
652 }
653 }
654 }
655 result_epoch_watermark.reverse();
658 assert!(
659 result_epoch_watermark
660 .is_sorted_by(|(first_epoch, _), (second_epoch, _)| { first_epoch < second_epoch })
661 );
662 *self = TableWatermarks {
663 watermarks: result_epoch_watermark,
664 direction: self.direction,
665 watermark_type: self.watermark_type,
666 }
667 }
668}
669
670impl TableWatermarks {
671 pub fn estimated_encode_len(&self) -> usize {
672 self.watermarks.len() * size_of::<HummockEpoch>()
673 + self
674 .watermarks
675 .iter()
676 .map(|(_, watermarks)| {
677 watermarks
678 .iter()
679 .map(|watermark| watermark.estimated_size())
680 .sum::<usize>()
681 })
682 .sum::<usize>()
683 + size_of::<bool>() }
685
686 pub fn to_protobuf(&self) -> PbTableWatermarks {
687 self.into()
688 }
689}
690
691impl From<&PbTableWatermarks> for TableWatermarks {
692 fn from(pb: &PbTableWatermarks) -> Self {
693 Self {
694 watermarks: pb
695 .epoch_watermarks
696 .iter()
697 .map(|epoch_watermark| {
698 let epoch = epoch_watermark.epoch;
699 let watermarks = epoch_watermark
700 .watermarks
701 .iter()
702 .map(VnodeWatermark::from)
703 .collect();
704 (epoch, watermarks)
705 })
706 .collect(),
707 direction: if pb.is_ascending {
708 WatermarkDirection::Ascending
709 } else {
710 WatermarkDirection::Descending
711 },
712 watermark_type: if pb.is_non_pk_prefix {
713 WatermarkSerdeType::NonPkPrefix
714 } else {
715 WatermarkSerdeType::PkPrefix
716 },
717 }
718 }
719}
720
721impl From<&TableWatermarks> for PbTableWatermarks {
722 fn from(table_watermarks: &TableWatermarks) -> Self {
723 Self {
724 epoch_watermarks: table_watermarks
725 .watermarks
726 .iter()
727 .map(|(epoch, watermarks)| PbEpochNewWatermarks {
728 watermarks: watermarks.iter().map(|wm| wm.into()).collect(),
729 epoch: *epoch,
730 })
731 .collect(),
732 is_ascending: match table_watermarks.direction {
733 WatermarkDirection::Ascending => true,
734 WatermarkDirection::Descending => false,
735 },
736 is_non_pk_prefix: match table_watermarks.watermark_type {
737 WatermarkSerdeType::NonPkPrefix => true,
738 WatermarkSerdeType::PkPrefix => false,
739 },
740 }
741 }
742}
743
744impl From<PbTableWatermarks> for TableWatermarks {
745 fn from(pb: PbTableWatermarks) -> Self {
746 Self {
747 watermarks: pb
748 .epoch_watermarks
749 .into_iter()
750 .map(|epoch_watermark| {
751 let epoch = epoch_watermark.epoch;
752 let watermarks = epoch_watermark
753 .watermarks
754 .into_iter()
755 .map(VnodeWatermark::from)
756 .collect();
757 (epoch, watermarks)
758 })
759 .collect(),
760 direction: if pb.is_ascending {
761 WatermarkDirection::Ascending
762 } else {
763 WatermarkDirection::Descending
764 },
765 watermark_type: if pb.is_non_pk_prefix {
766 WatermarkSerdeType::NonPkPrefix
767 } else {
768 WatermarkSerdeType::PkPrefix
769 },
770 }
771 }
772}
773
774impl From<TableWatermarks> for PbTableWatermarks {
775 fn from(table_watermarks: TableWatermarks) -> Self {
776 Self {
777 epoch_watermarks: table_watermarks
778 .watermarks
779 .into_iter()
780 .map(|(epoch, watermarks)| PbEpochNewWatermarks {
781 watermarks: watermarks.iter().map(PbVnodeWatermark::from).collect(),
782 epoch,
783 })
784 .collect(),
785 is_ascending: match table_watermarks.direction {
786 WatermarkDirection::Ascending => true,
787 WatermarkDirection::Descending => false,
788 },
789 is_non_pk_prefix: match table_watermarks.watermark_type {
790 WatermarkSerdeType::NonPkPrefix => true,
791 WatermarkSerdeType::PkPrefix => false,
792 },
793 }
794 }
795}
796
797#[derive(Debug, Clone, Copy, PartialEq, Eq)]
798pub enum WatermarkSerdeType {
799 PkPrefix,
800 NonPkPrefix,
801}
802
803#[cfg(test)]
804mod tests {
805 use std::collections::Bound::Included;
806 use std::collections::{Bound, HashMap};
807 use std::ops::Bound::Excluded;
808 use std::sync::Arc;
809 use std::vec;
810
811 use bytes::Bytes;
812 use itertools::Itertools;
813 use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
814 use risingwave_common::catalog::TableId;
815 use risingwave_common::hash::VirtualNode;
816 use risingwave_common::util::epoch::{EpochExt, test_epoch};
817 use risingwave_pb::hummock::{PbHummockVersion, StateTableInfo};
818
819 use crate::compaction_group::StaticCompactionGroupId;
820 use crate::key::{TableKeyRange, is_empty_key_range, prefixed_range_with_vnode};
821 use crate::table_watermark::{
822 PkPrefixTableWatermarksIndex, TableWatermarks, VnodeWatermark, WatermarkDirection,
823 WatermarkSerdeType, merge_multiple_new_table_watermarks,
824 };
825 use crate::version::HummockVersion;
826
827 fn build_bitmap(vnodes: impl IntoIterator<Item = usize>) -> Arc<Bitmap> {
828 let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
829 for vnode in vnodes {
830 builder.set(vnode, true);
831 }
832 Arc::new(builder.finish())
833 }
834
835 #[test]
836 fn test_apply_new_table_watermark() {
837 let epoch1 = test_epoch(1);
838 let direction = WatermarkDirection::Ascending;
839 let watermark1 = Bytes::from("watermark1");
840 let watermark2 = Bytes::from("watermark2");
841 let watermark3 = Bytes::from("watermark3");
842 let watermark4 = Bytes::from("watermark4");
843 let watermark_type = WatermarkSerdeType::PkPrefix;
844 let mut table_watermarks = TableWatermarks::single_epoch(
845 epoch1,
846 vec![VnodeWatermark::new(
847 build_bitmap(vec![0, 1, 2]),
848 watermark1.clone(),
849 )],
850 direction,
851 watermark_type,
852 );
853 let epoch2 = epoch1.next_epoch();
854 table_watermarks.add_new_epoch_watermarks(
855 epoch2,
856 vec![VnodeWatermark::new(
857 build_bitmap(vec![0, 1, 2, 3]),
858 watermark2.clone(),
859 )]
860 .into(),
861 direction,
862 watermark_type,
863 );
864
865 let mut table_watermark_checkpoint = table_watermarks.clone();
866
867 let epoch3 = epoch2.next_epoch();
868 let mut second_table_watermark = TableWatermarks::single_epoch(
869 epoch3,
870 vec![VnodeWatermark::new(
871 build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
872 watermark3.clone(),
873 )],
874 direction,
875 watermark_type,
876 );
877 table_watermarks.add_new_epoch_watermarks(
878 epoch3,
879 vec![VnodeWatermark::new(
880 build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
881 watermark3.clone(),
882 )]
883 .into(),
884 direction,
885 watermark_type,
886 );
887 let epoch4 = epoch3.next_epoch();
888 let epoch5 = epoch4.next_epoch();
889 table_watermarks.add_new_epoch_watermarks(
890 epoch5,
891 vec![VnodeWatermark::new(
892 build_bitmap(vec![0, 3, 4]),
893 watermark4.clone(),
894 )]
895 .into(),
896 direction,
897 watermark_type,
898 );
899 second_table_watermark.add_new_epoch_watermarks(
900 epoch5,
901 vec![VnodeWatermark::new(
902 build_bitmap(vec![0, 3, 4]),
903 watermark4.clone(),
904 )]
905 .into(),
906 direction,
907 watermark_type,
908 );
909
910 table_watermark_checkpoint.apply_new_table_watermarks(&second_table_watermark);
911 assert_eq!(table_watermarks, table_watermark_checkpoint);
912 }
913
914 #[test]
915 fn test_clear_stale_epoch_watmermark() {
916 let epoch1 = test_epoch(1);
917 let direction = WatermarkDirection::Ascending;
918 let watermark1 = Bytes::from("watermark1");
919 let watermark2 = Bytes::from("watermark2");
920 let watermark3 = Bytes::from("watermark3");
921 let watermark4 = Bytes::from("watermark4");
922 let watermark_type = WatermarkSerdeType::PkPrefix;
923 let mut table_watermarks = TableWatermarks::single_epoch(
924 epoch1,
925 vec![VnodeWatermark::new(
926 build_bitmap(vec![0, 1, 2]),
927 watermark1.clone(),
928 )],
929 direction,
930 watermark_type,
931 );
932 let epoch2 = epoch1.next_epoch();
933 table_watermarks.add_new_epoch_watermarks(
934 epoch2,
935 vec![VnodeWatermark::new(
936 build_bitmap(vec![0, 1, 2, 3]),
937 watermark2.clone(),
938 )]
939 .into(),
940 direction,
941 watermark_type,
942 );
943 let epoch3 = epoch2.next_epoch();
944 table_watermarks.add_new_epoch_watermarks(
945 epoch3,
946 vec![VnodeWatermark::new(
947 build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
948 watermark3.clone(),
949 )]
950 .into(),
951 direction,
952 watermark_type,
953 );
954 let epoch4 = epoch3.next_epoch();
955 let epoch5 = epoch4.next_epoch();
956 table_watermarks.add_new_epoch_watermarks(
957 epoch5,
958 vec![VnodeWatermark::new(
959 build_bitmap(vec![0, 3, 4]),
960 watermark4.clone(),
961 )]
962 .into(),
963 direction,
964 watermark_type,
965 );
966
967 let mut table_watermarks_checkpoint = table_watermarks.clone();
968 table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch1);
969 assert_eq!(table_watermarks_checkpoint, table_watermarks);
970
971 table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch2);
972 assert_eq!(
973 table_watermarks_checkpoint,
974 TableWatermarks {
975 watermarks: vec![
976 (
977 epoch2,
978 vec![VnodeWatermark::new(
979 build_bitmap(vec![0, 1, 2, 3]),
980 watermark2.clone(),
981 )]
982 .into()
983 ),
984 (
985 epoch3,
986 vec![VnodeWatermark::new(
987 build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
988 watermark3.clone(),
989 )]
990 .into()
991 ),
992 (
993 epoch5,
994 vec![VnodeWatermark::new(
995 build_bitmap(vec![0, 3, 4]),
996 watermark4.clone(),
997 )]
998 .into()
999 )
1000 ],
1001 direction,
1002 watermark_type,
1003 }
1004 );
1005
1006 table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch3);
1007 assert_eq!(
1008 table_watermarks_checkpoint,
1009 TableWatermarks {
1010 watermarks: vec![
1011 (
1012 epoch3,
1013 vec![VnodeWatermark::new(
1014 build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
1015 watermark3.clone(),
1016 )]
1017 .into()
1018 ),
1019 (
1020 epoch5,
1021 vec![VnodeWatermark::new(
1022 build_bitmap(vec![0, 3, 4]),
1023 watermark4.clone(),
1024 )]
1025 .into()
1026 )
1027 ],
1028 direction,
1029 watermark_type,
1030 }
1031 );
1032
1033 table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch4);
1034 assert_eq!(
1035 table_watermarks_checkpoint,
1036 TableWatermarks {
1037 watermarks: vec![
1038 (
1039 epoch4,
1040 vec![VnodeWatermark::new(
1041 build_bitmap((1..3).chain(5..VirtualNode::COUNT_FOR_TEST)),
1042 watermark3.clone()
1043 )]
1044 .into()
1045 ),
1046 (
1047 epoch5,
1048 vec![VnodeWatermark::new(
1049 build_bitmap(vec![0, 3, 4]),
1050 watermark4.clone(),
1051 )]
1052 .into()
1053 )
1054 ],
1055 direction,
1056 watermark_type,
1057 }
1058 );
1059
1060 table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch5);
1061 assert_eq!(
1062 table_watermarks_checkpoint,
1063 TableWatermarks {
1064 watermarks: vec![(
1065 epoch5,
1066 vec![
1067 VnodeWatermark::new(build_bitmap(vec![0, 3, 4]), watermark4.clone()),
1068 VnodeWatermark::new(
1069 build_bitmap((1..3).chain(5..VirtualNode::COUNT_FOR_TEST)),
1070 watermark3.clone()
1071 )
1072 ]
1073 .into()
1074 )],
1075 direction,
1076 watermark_type,
1077 }
1078 );
1079 }
1080
1081 #[test]
1082 fn test_merge_multiple_new_table_watermarks() {
1083 fn epoch_new_watermark(epoch: u64, bitmaps: Vec<&Bitmap>) -> (u64, Arc<[VnodeWatermark]>) {
1084 (
1085 epoch,
1086 bitmaps
1087 .into_iter()
1088 .map(|bitmap| VnodeWatermark {
1089 watermark: Bytes::from(vec![1, 2, epoch as _]),
1090 vnode_bitmap: Arc::new(bitmap.clone()),
1091 })
1092 .collect_vec()
1093 .into(),
1094 )
1095 }
1096 fn build_table_watermark(
1097 vnodes: impl IntoIterator<Item = usize>,
1098 epochs: impl IntoIterator<Item = u64>,
1099 ) -> TableWatermarks {
1100 let bitmap = build_bitmap(vnodes);
1101 TableWatermarks {
1102 watermarks: epochs
1103 .into_iter()
1104 .map(|epoch: u64| epoch_new_watermark(epoch, vec![&bitmap]))
1105 .collect(),
1106 direction: WatermarkDirection::Ascending,
1107 watermark_type: WatermarkSerdeType::PkPrefix,
1108 }
1109 }
1110 let table1_watermark1 = build_table_watermark(0..3, vec![1, 2, 4]);
1111 let table1_watermark2 = build_table_watermark(4..6, vec![1, 2, 5]);
1112 let table2_watermark = build_table_watermark(0..4, 1..3);
1113 let table3_watermark = build_table_watermark(0..4, 3..5);
1114 let mut first = HashMap::new();
1115 first.insert(TableId::new(1), table1_watermark1);
1116 first.insert(TableId::new(2), table2_watermark.clone());
1117 let mut second = HashMap::new();
1118 second.insert(TableId::new(1), table1_watermark2);
1119 second.insert(TableId::new(3), table3_watermark.clone());
1120 let result = merge_multiple_new_table_watermarks(vec![first, second]);
1121 let mut expected = HashMap::new();
1122 expected.insert(
1123 TableId::new(1),
1124 TableWatermarks {
1125 watermarks: vec![
1126 epoch_new_watermark(1, vec![&build_bitmap(0..3), &build_bitmap(4..6)]),
1127 epoch_new_watermark(2, vec![&build_bitmap(0..3), &build_bitmap(4..6)]),
1128 epoch_new_watermark(4, vec![&build_bitmap(0..3)]),
1129 epoch_new_watermark(5, vec![&build_bitmap(4..6)]),
1130 ],
1131 direction: WatermarkDirection::Ascending,
1132 watermark_type: WatermarkSerdeType::PkPrefix,
1133 },
1134 );
1135 expected.insert(TableId::new(2), table2_watermark);
1136 expected.insert(TableId::new(3), table3_watermark);
1137 assert_eq!(result, expected);
1138 }
1139
1140 const COMMITTED_EPOCH: u64 = test_epoch(1);
1141 const EPOCH1: u64 = test_epoch(2);
1142 const EPOCH2: u64 = test_epoch(3);
1143 const TEST_SINGLE_VNODE: VirtualNode = VirtualNode::from_index(1);
1144
1145 fn build_watermark_range(
1146 direction: WatermarkDirection,
1147 (low, high): (Bound<Bytes>, Bound<Bytes>),
1148 ) -> TableKeyRange {
1149 let range = match direction {
1150 WatermarkDirection::Ascending => (low, high),
1151 WatermarkDirection::Descending => (high, low),
1152 };
1153 prefixed_range_with_vnode(range, TEST_SINGLE_VNODE)
1154 }
1155
1156 fn build_and_test_watermark_index(
1160 direction: WatermarkDirection,
1161 watermark1: Bytes,
1162 watermark2: Bytes,
1163 watermark3: Bytes,
1164 ) -> PkPrefixTableWatermarksIndex {
1165 let mut index = PkPrefixTableWatermarksIndex::new(
1166 direction,
1167 EPOCH1,
1168 vec![VnodeWatermark::new(build_bitmap(0..4), watermark1.clone())],
1169 Some(COMMITTED_EPOCH),
1170 );
1171 index.add_epoch_watermark(
1172 EPOCH2,
1173 vec![VnodeWatermark::new(build_bitmap(1..5), watermark2.clone())].into(),
1174 direction,
1175 );
1176
1177 assert_eq!(
1178 index.read_watermark(VirtualNode::from_index(0), EPOCH1),
1179 Some(watermark1.clone())
1180 );
1181 assert_eq!(
1182 index.read_watermark(VirtualNode::from_index(1), EPOCH1),
1183 Some(watermark1.clone())
1184 );
1185 assert_eq!(
1186 index.read_watermark(VirtualNode::from_index(4), EPOCH1),
1187 None
1188 );
1189 assert_eq!(
1190 index.read_watermark(VirtualNode::from_index(0), EPOCH2),
1191 Some(watermark1.clone())
1192 );
1193 assert_eq!(
1194 index.read_watermark(VirtualNode::from_index(1), EPOCH2),
1195 Some(watermark2.clone())
1196 );
1197 assert_eq!(
1198 index.read_watermark(VirtualNode::from_index(4), EPOCH2),
1199 Some(watermark2.clone())
1200 );
1201 assert_eq!(
1202 index.latest_watermark(VirtualNode::from_index(0)),
1203 Some(watermark1.clone())
1204 );
1205 assert_eq!(
1206 index.latest_watermark(VirtualNode::from_index(1)),
1207 Some(watermark2.clone())
1208 );
1209 assert_eq!(
1210 index.latest_watermark(VirtualNode::from_index(4)),
1211 Some(watermark2.clone())
1212 );
1213
1214 let check_watermark_range =
1216 |query_range: (Bound<Bytes>, Bound<Bytes>),
1217 output_range: Option<(Bound<Bytes>, Bound<Bytes>)>| {
1218 let mut range = build_watermark_range(direction, query_range);
1219 index.rewrite_range_with_table_watermark(EPOCH2, &mut range);
1220 if let Some(output_range) = output_range {
1221 assert_eq!(range, build_watermark_range(direction, output_range));
1222 } else {
1223 assert!(is_empty_key_range(&range));
1224 }
1225 };
1226
1227 check_watermark_range(
1229 (Included(watermark1.clone()), Excluded(watermark3.clone())),
1230 Some((Included(watermark2.clone()), Excluded(watermark3.clone()))),
1231 );
1232
1233 check_watermark_range(
1235 (Included(watermark2.clone()), Excluded(watermark3.clone())),
1236 Some((Included(watermark2.clone()), Excluded(watermark3.clone()))),
1237 );
1238 check_watermark_range(
1239 (Excluded(watermark2.clone()), Excluded(watermark3.clone())),
1240 Some((Excluded(watermark2.clone()), Excluded(watermark3.clone()))),
1241 );
1242
1243 check_watermark_range(
1245 (Excluded(watermark1.clone()), Excluded(watermark2.clone())),
1246 None,
1247 );
1248 check_watermark_range(
1249 (Excluded(watermark1.clone()), Included(watermark2.clone())),
1250 Some((Included(watermark2.clone()), Included(watermark2.clone()))),
1251 );
1252
1253 index
1254 }
1255
1256 #[test]
1257 fn test_watermark_index_ascending() {
1258 let watermark1 = Bytes::from_static(b"watermark1");
1259 let watermark2 = Bytes::from_static(b"watermark2");
1260 let watermark3 = Bytes::from_static(b"watermark3");
1261 build_and_test_watermark_index(
1262 WatermarkDirection::Ascending,
1263 watermark1.clone(),
1264 watermark2.clone(),
1265 watermark3.clone(),
1266 );
1267 }
1268
1269 #[test]
1270 fn test_watermark_index_descending() {
1271 let watermark1 = Bytes::from_static(b"watermark254");
1272 let watermark2 = Bytes::from_static(b"watermark253");
1273 let watermark3 = Bytes::from_static(b"watermark252");
1274 build_and_test_watermark_index(
1275 WatermarkDirection::Descending,
1276 watermark1.clone(),
1277 watermark2.clone(),
1278 watermark3.clone(),
1279 );
1280 }
1281
1282 #[test]
1283 fn test_apply_committed_index() {
1284 let watermark1 = Bytes::from_static(b"watermark1");
1285 let watermark2 = Bytes::from_static(b"watermark2");
1286 let watermark3 = Bytes::from_static(b"watermark3");
1287 let mut index = build_and_test_watermark_index(
1288 WatermarkDirection::Ascending,
1289 watermark1.clone(),
1290 watermark2.clone(),
1291 watermark3.clone(),
1292 );
1293
1294 let test_table_id = TableId::from(233);
1295
1296 let mut version = HummockVersion::from_rpc_protobuf(&PbHummockVersion {
1297 state_table_info: HashMap::from_iter([(
1298 test_table_id.table_id,
1299 StateTableInfo {
1300 committed_epoch: EPOCH1,
1301 compaction_group_id: StaticCompactionGroupId::StateDefault as _,
1302 },
1303 )]),
1304 ..Default::default()
1305 });
1306 version.table_watermarks.insert(
1307 test_table_id,
1308 TableWatermarks {
1309 watermarks: vec![(
1310 EPOCH1,
1311 vec![VnodeWatermark {
1312 watermark: watermark1.clone(),
1313 vnode_bitmap: build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
1314 }]
1315 .into(),
1316 )],
1317 direction: WatermarkDirection::Ascending,
1318 watermark_type: WatermarkSerdeType::PkPrefix,
1319 }
1320 .into(),
1321 );
1322 index.apply_committed_watermarks(
1323 version
1324 .table_watermarks
1325 .get(&test_table_id)
1326 .unwrap()
1327 .clone(),
1328 EPOCH1,
1329 );
1330 assert_eq!(EPOCH1, index.committed_epoch.unwrap());
1331 assert_eq!(EPOCH2, index.latest_epoch);
1332 for vnode in 0..VirtualNode::COUNT_FOR_TEST {
1333 let vnode = VirtualNode::from_index(vnode);
1334 if (1..5).contains(&vnode.to_index()) {
1335 assert_eq!(watermark1, index.read_watermark(vnode, EPOCH1).unwrap());
1336 assert_eq!(watermark2, index.read_watermark(vnode, EPOCH2).unwrap());
1337 } else {
1338 assert_eq!(watermark1, index.read_watermark(vnode, EPOCH1).unwrap());
1339 }
1340 }
1341 }
1342
1343 #[test]
1344 fn test_filter_regress_watermark() {
1345 let watermark1 = Bytes::from_static(b"watermark1");
1346 let watermark2 = Bytes::from_static(b"watermark2");
1347 let watermark3 = Bytes::from_static(b"watermark3");
1348 let index = build_and_test_watermark_index(
1349 WatermarkDirection::Ascending,
1350 watermark1.clone(),
1351 watermark2.clone(),
1352 watermark3.clone(),
1353 );
1354
1355 let mut new_watermarks = vec![
1356 VnodeWatermark {
1358 vnode_bitmap: build_bitmap(0..2),
1359 watermark: watermark1.clone(),
1360 },
1361 VnodeWatermark {
1363 vnode_bitmap: build_bitmap(2..4),
1364 watermark: watermark3.clone(),
1365 },
1366 VnodeWatermark {
1368 vnode_bitmap: build_bitmap(4..5),
1369 watermark: watermark1.clone(),
1370 },
1371 VnodeWatermark {
1373 vnode_bitmap: build_bitmap(5..6),
1374 watermark: watermark3.clone(),
1375 },
1376 ];
1377
1378 index.filter_regress_watermarks(&mut new_watermarks);
1379
1380 assert_eq!(
1381 new_watermarks,
1382 vec![
1383 VnodeWatermark {
1384 vnode_bitmap: build_bitmap(0..1),
1385 watermark: watermark1,
1386 },
1387 VnodeWatermark {
1388 vnode_bitmap: build_bitmap(2..4),
1389 watermark: watermark3.clone(),
1390 },
1391 VnodeWatermark {
1392 vnode_bitmap: build_bitmap(5..6),
1393 watermark: watermark3,
1394 },
1395 ]
1396 );
1397 }
1398}