1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
17use std::fmt::Display;
18use std::mem::size_of;
19use std::ops::Bound::{Excluded, Included, Unbounded};
20use std::sync::Arc;
21
22use bytes::Bytes;
23use itertools::Itertools;
24use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
25use risingwave_common::catalog::TableId;
26use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
27use risingwave_common::types::ToDatumRef;
28use risingwave_common::util::sort_util::{OrderType, cmp_datum};
29use risingwave_common_estimate_size::EstimateSize;
30use risingwave_pb::hummock::table_watermarks::PbEpochNewWatermarks;
31use risingwave_pb::hummock::{PbVnodeWatermark, TableWatermarks as PbTableWatermarks};
32use tracing::{debug, warn};
33
34use crate::HummockEpoch;
35use crate::key::{TableKey, TableKeyRange, prefix_slice_with_vnode, vnode};
36
37#[derive(Clone)]
38pub struct ReadTableWatermark {
39 pub direction: WatermarkDirection,
40 pub vnode_watermarks: BTreeMap<VirtualNode, Bytes>,
41}
42
43#[derive(Clone)]
44pub struct PkPrefixTableWatermarksIndex {
45 pub watermark_direction: WatermarkDirection,
46 pub staging_watermarks: VecDeque<(HummockEpoch, Arc<[VnodeWatermark]>)>,
48 pub committed_watermarks: Option<Arc<TableWatermarks>>,
49 latest_epoch: HummockEpoch,
50 committed_epoch: Option<HummockEpoch>,
51}
52
53impl PkPrefixTableWatermarksIndex {
54 pub fn new(
55 watermark_direction: WatermarkDirection,
56 first_epoch: HummockEpoch,
57 first_vnode_watermark: Vec<VnodeWatermark>,
58 committed_epoch: Option<HummockEpoch>,
59 ) -> Self {
60 if let Some(committed_epoch) = committed_epoch {
61 assert!(first_epoch > committed_epoch);
62 }
63 Self {
64 watermark_direction,
65 staging_watermarks: VecDeque::from_iter([(
66 first_epoch,
67 Arc::from(first_vnode_watermark),
68 )]),
69 committed_watermarks: None,
70 latest_epoch: first_epoch,
71 committed_epoch,
72 }
73 }
74
75 pub fn new_committed(
76 committed_watermarks: Arc<TableWatermarks>,
77 committed_epoch: HummockEpoch,
78 ) -> Self {
79 assert_eq!(
80 committed_watermarks.watermark_type,
81 WatermarkSerdeType::PkPrefix
82 );
83 Self {
84 watermark_direction: committed_watermarks.direction,
85 staging_watermarks: VecDeque::new(),
86 committed_epoch: Some(committed_epoch),
87 latest_epoch: committed_epoch,
88 committed_watermarks: Some(committed_watermarks),
89 }
90 }
91
92 pub fn read_watermark(&self, vnode: VirtualNode, epoch: HummockEpoch) -> Option<Bytes> {
93 for (watermark_epoch, vnode_watermark_list) in self.staging_watermarks.iter().rev().chain(
95 self.committed_watermarks
96 .iter()
97 .flat_map(|watermarks| watermarks.watermarks.iter().rev()),
98 ) {
99 if *watermark_epoch > epoch {
100 continue;
101 }
102 for vnode_watermark in vnode_watermark_list.as_ref() {
103 if vnode_watermark.vnode_bitmap.is_set(vnode.to_index()) {
104 return Some(vnode_watermark.watermark.clone());
105 }
106 }
107 }
108 None
109 }
110
111 pub fn latest_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
112 self.read_watermark(vnode, HummockEpoch::MAX)
113 }
114
115 pub fn rewrite_range_with_table_watermark(
116 &self,
117 epoch: HummockEpoch,
118 key_range: &mut TableKeyRange,
119 ) {
120 let vnode = vnode(key_range);
121 if let Some(watermark) = self.read_watermark(vnode, epoch) {
122 match self.watermark_direction {
123 WatermarkDirection::Ascending => {
124 let overwrite_start_key = match &key_range.0 {
125 Included(start_key) | Excluded(start_key) => {
126 start_key.key_part() < watermark
127 }
128 Unbounded => true,
129 };
130 if overwrite_start_key {
131 let watermark_key = TableKey(prefix_slice_with_vnode(vnode, &watermark));
132 let fully_filtered = match &key_range.1 {
133 Included(end_key) => end_key < &watermark_key,
134 Excluded(end_key) => end_key <= &watermark_key,
135 Unbounded => false,
136 };
137 if fully_filtered {
138 key_range.1 = Excluded(watermark_key.clone());
139 }
140 key_range.0 = Included(watermark_key);
141 }
142 }
143 WatermarkDirection::Descending => {
144 let overwrite_end_key = match &key_range.1 {
145 Included(end_key) | Excluded(end_key) => end_key.key_part() > watermark,
146 Unbounded => true,
147 };
148 if overwrite_end_key {
149 let watermark_key = TableKey(prefix_slice_with_vnode(vnode, &watermark));
150 let fully_filtered = match &key_range.0 {
151 Included(start_key) => start_key > &watermark_key,
152 Excluded(start_key) => start_key >= &watermark_key,
153 Unbounded => false,
154 };
155 if fully_filtered {
156 *key_range = (Included(watermark_key.clone()), Excluded(watermark_key));
157 } else {
158 key_range.1 = Included(watermark_key);
159 }
160 }
161 }
162 }
163 }
164 }
165
166 pub fn filter_regress_watermarks(&self, watermarks: &mut Vec<VnodeWatermark>) {
167 let mut ret = Vec::with_capacity(watermarks.len());
168 for watermark in watermarks.drain(..) {
169 let vnode_count = watermark.vnode_count();
170
171 let mut regress_vnodes = None;
172 for vnode in watermark.vnode_bitmap.iter_vnodes() {
173 if let Some(prev_watermark) = self.latest_watermark(vnode) {
174 let is_regress = match self.direction() {
175 WatermarkDirection::Ascending => prev_watermark > watermark.watermark,
176 WatermarkDirection::Descending => prev_watermark < watermark.watermark,
177 };
178 if is_regress {
179 warn!(
180 "table watermark regress: {:?} {} {:?} {:?}",
181 self.direction(),
182 vnode,
183 watermark.watermark,
184 prev_watermark
185 );
186 regress_vnodes
187 .get_or_insert_with(|| BitmapBuilder::zeroed(vnode_count))
188 .set(vnode.to_index(), true);
189 }
190 }
191 }
192 if let Some(regress_vnodes) = regress_vnodes {
193 let mut bitmap_builder = None;
194 for vnode in watermark.vnode_bitmap.iter_vnodes() {
195 let vnode_index = vnode.to_index();
196 if !regress_vnodes.is_set(vnode_index) {
197 bitmap_builder
198 .get_or_insert_with(|| BitmapBuilder::zeroed(vnode_count))
199 .set(vnode_index, true);
200 }
201 }
202 if let Some(bitmap_builder) = bitmap_builder {
203 ret.push(VnodeWatermark::new(
204 Arc::new(bitmap_builder.finish()),
205 watermark.watermark,
206 ));
207 }
208 } else {
209 ret.push(watermark);
211 }
212 }
213 *watermarks = ret;
214 }
215
216 pub fn direction(&self) -> WatermarkDirection {
217 self.watermark_direction
218 }
219
220 pub fn add_epoch_watermark(
221 &mut self,
222 epoch: HummockEpoch,
223 vnode_watermark_list: Arc<[VnodeWatermark]>,
224 direction: WatermarkDirection,
225 ) {
226 assert!(epoch > self.latest_epoch);
227 assert_eq!(self.watermark_direction, direction);
228 self.latest_epoch = epoch;
229 #[cfg(debug_assertions)]
230 if !vnode_watermark_list.is_empty() {
231 let vnode_count = vnode_watermark_list[0].vnode_count();
232 let mut vnode_is_set = BitmapBuilder::zeroed(vnode_count);
233 for vnode_watermark in vnode_watermark_list.as_ref() {
234 for vnode in vnode_watermark.vnode_bitmap.iter_ones() {
235 assert!(!vnode_is_set.is_set(vnode));
236 vnode_is_set.set(vnode, true);
237 let vnode = VirtualNode::from_index(vnode);
238 if let Some(prev_watermark) = self.latest_watermark(vnode) {
239 match self.watermark_direction {
240 WatermarkDirection::Ascending => {
241 assert!(vnode_watermark.watermark >= prev_watermark);
242 }
243 WatermarkDirection::Descending => {
244 assert!(vnode_watermark.watermark <= prev_watermark);
245 }
246 };
247 }
248 }
249 }
250 }
251 self.staging_watermarks
252 .push_back((epoch, vnode_watermark_list));
253 }
254
255 pub fn apply_committed_watermarks(
256 &mut self,
257 committed_watermark: Arc<TableWatermarks>,
258 committed_epoch: HummockEpoch,
259 ) {
260 assert_eq!(
261 committed_watermark.watermark_type,
262 WatermarkSerdeType::PkPrefix
263 );
264 assert_eq!(self.watermark_direction, committed_watermark.direction);
265 if let Some(prev_committed_epoch) = self.committed_epoch {
266 assert!(prev_committed_epoch <= committed_epoch);
267 if prev_committed_epoch == committed_epoch {
268 return;
269 }
270 }
271 if self.latest_epoch < committed_epoch {
272 debug!(
273 latest_epoch = self.latest_epoch,
274 committed_epoch, "committed_epoch exceed table watermark latest_epoch"
275 );
276 self.latest_epoch = committed_epoch;
277 }
278 self.committed_epoch = Some(committed_epoch);
279 self.committed_watermarks = Some(committed_watermark);
280 while let Some((old_epoch, _)) = self.staging_watermarks.front()
282 && *old_epoch <= committed_epoch
283 {
284 let _ = self.staging_watermarks.pop_front();
285 }
286 }
287}
288
289#[derive(Debug, Clone, Copy, Eq, PartialEq)]
290pub enum WatermarkDirection {
291 Ascending,
292 Descending,
293}
294
295impl Display for WatermarkDirection {
296 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
297 match self {
298 WatermarkDirection::Ascending => write!(f, "Ascending"),
299 WatermarkDirection::Descending => write!(f, "Descending"),
300 }
301 }
302}
303
304impl WatermarkDirection {
305 pub fn key_filter_by_watermark(
306 &self,
307 key: impl AsRef<[u8]>,
308 watermark: impl AsRef<[u8]>,
309 ) -> bool {
310 let key = key.as_ref();
311 let watermark = watermark.as_ref();
312 match self {
313 WatermarkDirection::Ascending => key < watermark,
314 WatermarkDirection::Descending => key > watermark,
315 }
316 }
317
318 pub fn datum_filter_by_watermark(
319 &self,
320 watermark_col_in_pk: impl ToDatumRef,
321 watermark: impl ToDatumRef,
322 order_type: OrderType,
323 ) -> bool {
324 let watermark_col_in_pk = watermark_col_in_pk.to_datum_ref();
325 let watermark = watermark.to_datum_ref();
326 match self {
327 WatermarkDirection::Ascending => {
328 cmp_datum(watermark_col_in_pk, watermark, order_type).is_lt()
330 }
331 WatermarkDirection::Descending => {
332 cmp_datum(watermark_col_in_pk, watermark, order_type).is_gt()
334 }
335 }
336 }
337
338 pub fn is_ascending(&self) -> bool {
339 match self {
340 WatermarkDirection::Ascending => true,
341 WatermarkDirection::Descending => false,
342 }
343 }
344}
345
346#[derive(Clone, Debug, PartialEq, EstimateSize)]
347pub struct VnodeWatermark {
348 vnode_bitmap: Arc<Bitmap>,
349 watermark: Bytes,
350}
351
352impl VnodeWatermark {
353 pub fn new(vnode_bitmap: Arc<Bitmap>, watermark: Bytes) -> Self {
354 Self {
355 vnode_bitmap,
356 watermark,
357 }
358 }
359
360 pub fn vnode_bitmap(&self) -> &Bitmap {
361 &self.vnode_bitmap
362 }
363
364 pub fn vnode_count(&self) -> usize {
366 self.vnode_bitmap.len()
367 }
368
369 pub fn watermark(&self) -> &Bytes {
370 &self.watermark
371 }
372
373 pub fn to_protobuf(&self) -> PbVnodeWatermark {
374 self.into()
375 }
376}
377
378impl From<PbVnodeWatermark> for VnodeWatermark {
379 fn from(pb: PbVnodeWatermark) -> Self {
380 Self {
381 vnode_bitmap: Arc::new(Bitmap::from(pb.vnode_bitmap.as_ref().unwrap())),
382 watermark: Bytes::from(pb.watermark),
383 }
384 }
385}
386
387impl From<&PbVnodeWatermark> for VnodeWatermark {
388 fn from(pb: &PbVnodeWatermark) -> Self {
389 Self {
390 vnode_bitmap: Arc::new(Bitmap::from(pb.vnode_bitmap.as_ref().unwrap())),
391 watermark: Bytes::from(pb.watermark.clone()),
392 }
393 }
394}
395
396impl From<VnodeWatermark> for PbVnodeWatermark {
397 fn from(watermark: VnodeWatermark) -> Self {
398 Self {
399 watermark: watermark.watermark.into(),
400 vnode_bitmap: Some(watermark.vnode_bitmap.to_protobuf()),
401 }
402 }
403}
404
405impl From<&VnodeWatermark> for PbVnodeWatermark {
406 fn from(watermark: &VnodeWatermark) -> Self {
407 Self {
408 watermark: watermark.watermark.to_vec(),
409 vnode_bitmap: Some(watermark.vnode_bitmap.to_protobuf()),
410 }
411 }
412}
413
414#[derive(Clone, Debug, PartialEq)]
415pub struct TableWatermarks {
416 pub watermarks: Vec<(HummockEpoch, Arc<[VnodeWatermark]>)>,
418 pub direction: WatermarkDirection,
419 pub watermark_type: WatermarkSerdeType,
420}
421
422impl TableWatermarks {
423 pub fn single_epoch(
424 epoch: HummockEpoch,
425 watermarks: Vec<VnodeWatermark>,
426 direction: WatermarkDirection,
427 watermark_type: WatermarkSerdeType,
428 ) -> Self {
429 let mut this = Self {
430 direction,
431 watermarks: Vec::new(),
432 watermark_type,
433 };
434 this.add_new_epoch_watermarks(epoch, watermarks.into(), direction, watermark_type);
435 this
436 }
437
438 pub fn add_new_epoch_watermarks(
439 &mut self,
440 epoch: HummockEpoch,
441 watermarks: Arc<[VnodeWatermark]>,
442 direction: WatermarkDirection,
443 watermark_type: WatermarkSerdeType,
444 ) {
445 assert_eq!(self.direction, direction);
446 assert_eq!(self.watermark_type, watermark_type);
447
448 if let Some((prev_epoch, _)) = self.watermarks.last() {
449 assert!(*prev_epoch < epoch);
450 }
451 if !watermarks.is_empty() {
452 let vnode_count = watermarks[0].vnode_count();
453 for watermark in &*watermarks {
454 assert_eq!(watermark.vnode_count(), vnode_count);
455 }
456 if let Some(existing_vnode_count) = self.vnode_count() {
457 assert_eq!(existing_vnode_count, vnode_count);
458 }
459 }
460 self.watermarks.push((epoch, watermarks));
461 }
462
463 fn vnode_count(&self) -> Option<usize> {
465 self.watermarks
466 .iter()
467 .flat_map(|(_, watermarks)| watermarks.as_ref())
468 .next()
469 .map(|w| w.vnode_count())
470 }
471
472 pub fn from_protobuf(pb: &PbTableWatermarks) -> Self {
473 Self {
474 watermarks: pb
475 .epoch_watermarks
476 .iter()
477 .map(|epoch_watermark| {
478 let epoch = epoch_watermark.epoch;
479 let watermarks = epoch_watermark
480 .watermarks
481 .iter()
482 .map(VnodeWatermark::from)
483 .collect_vec();
484 (epoch, Arc::from(watermarks))
485 })
486 .collect(),
487 direction: if pb.is_ascending {
488 WatermarkDirection::Ascending
489 } else {
490 WatermarkDirection::Descending
491 },
492 watermark_type: if pb.is_non_pk_prefix {
493 WatermarkSerdeType::NonPkPrefix
494 } else {
495 WatermarkSerdeType::PkPrefix
496 },
497 }
498 }
499}
500
501pub fn merge_multiple_new_table_watermarks(
502 table_watermarks_list: impl IntoIterator<Item = HashMap<TableId, TableWatermarks>>,
503) -> HashMap<TableId, TableWatermarks> {
504 #[allow(clippy::type_complexity)]
505 let mut ret: HashMap<
506 TableId,
507 (
508 WatermarkDirection,
509 BTreeMap<u64, Vec<VnodeWatermark>>,
510 WatermarkSerdeType,
511 ),
512 > = HashMap::new();
513 for table_watermarks in table_watermarks_list {
514 for (table_id, new_table_watermarks) in table_watermarks {
515 let epoch_watermarks = match ret.entry(table_id) {
516 Entry::Occupied(entry) => {
517 let (direction, epoch_watermarks, watermark_type) = entry.into_mut();
518 assert_eq!(&new_table_watermarks.direction, direction);
519 assert_eq!(&new_table_watermarks.watermark_type, watermark_type);
520 epoch_watermarks
521 }
522 Entry::Vacant(entry) => {
523 let (_, epoch_watermarks, _) = entry.insert((
524 new_table_watermarks.direction,
525 BTreeMap::new(),
526 new_table_watermarks.watermark_type,
527 ));
528 epoch_watermarks
529 }
530 };
531 for (new_epoch, new_epoch_watermarks) in new_table_watermarks.watermarks {
532 epoch_watermarks
533 .entry(new_epoch)
534 .or_insert_with(Vec::new)
535 .extend(new_epoch_watermarks.iter().cloned());
536 }
537 }
538 }
539 ret.into_iter()
540 .map(
541 |(table_id, (direction, epoch_watermarks, watermark_type))| {
542 (
543 table_id,
544 TableWatermarks {
545 direction,
546 watermarks: epoch_watermarks
548 .into_iter()
549 .map(|(epoch, watermarks)| (epoch, Arc::from(watermarks)))
550 .collect(),
551 watermark_type,
552 },
553 )
554 },
555 )
556 .collect()
557}
558
559impl TableWatermarks {
560 pub fn apply_new_table_watermarks(&mut self, newly_added_watermarks: &TableWatermarks) {
561 assert_eq!(self.direction, newly_added_watermarks.direction);
562 assert!(self.watermarks.iter().map(|(epoch, _)| epoch).is_sorted());
563 assert!(
564 newly_added_watermarks
565 .watermarks
566 .iter()
567 .map(|(epoch, _)| epoch)
568 .is_sorted()
569 );
570 if let Some((prev_last_epoch, _)) = self.watermarks.last()
572 && let Some((new_first_epoch, _)) = newly_added_watermarks.watermarks.first()
573 {
574 assert!(prev_last_epoch < new_first_epoch);
575 }
576 self.watermarks.extend(
577 newly_added_watermarks
578 .watermarks
579 .iter()
580 .map(|(epoch, new_watermarks)| (*epoch, new_watermarks.clone())),
581 );
582 }
583
584 pub fn clear_stale_epoch_watermark(&mut self, safe_epoch: u64) {
585 match self.watermarks.first() {
586 None => {
587 return;
589 }
590 Some((earliest_epoch, _)) => {
591 if *earliest_epoch >= safe_epoch {
592 return;
594 }
595 }
596 }
597 debug!("clear stale table watermark below epoch {}", safe_epoch);
598 let mut result_epoch_watermark = Vec::with_capacity(self.watermarks.len());
599 let mut set_vnode: HashSet<VirtualNode> = HashSet::new();
600 let mut vnode_count: Option<usize> = None; while let Some((epoch, _)) = self.watermarks.last() {
602 if *epoch >= safe_epoch {
603 let (epoch, watermarks) = self.watermarks.pop().expect("have check Some");
604 for watermark in watermarks.as_ref() {
605 vnode_count.get_or_insert_with(|| watermark.vnode_count());
606 for vnode in watermark.vnode_bitmap.iter_vnodes() {
607 set_vnode.insert(vnode);
608 }
609 }
610 result_epoch_watermark.push((epoch, watermarks));
611 } else {
612 break;
613 }
614 }
615 while vnode_count != Some(set_vnode.len())
616 && let Some((_, watermarks)) = self.watermarks.pop()
617 {
618 let mut new_vnode_watermarks = Vec::new();
619 for vnode_watermark in watermarks.as_ref() {
620 let mut new_set_vnode = Vec::new();
621 vnode_count.get_or_insert_with(|| vnode_watermark.vnode_count());
622 for vnode in vnode_watermark.vnode_bitmap.iter_vnodes() {
623 if set_vnode.insert(vnode) {
624 new_set_vnode.push(vnode);
625 }
626 }
627 if !new_set_vnode.is_empty() {
628 let mut builder = BitmapBuilder::zeroed(vnode_watermark.vnode_count());
629 for vnode in new_set_vnode {
630 builder.set(vnode.to_index(), true);
631 }
632 let bitmap = Arc::new(builder.finish());
633 new_vnode_watermarks.push(VnodeWatermark {
634 vnode_bitmap: bitmap,
635 watermark: vnode_watermark.watermark.clone(),
636 })
637 }
638 }
639 if !new_vnode_watermarks.is_empty() {
640 if let Some((last_epoch, last_watermarks)) = result_epoch_watermark.last_mut()
641 && *last_epoch == safe_epoch
642 {
643 *last_watermarks = Arc::from(
644 last_watermarks
645 .iter()
646 .cloned()
647 .chain(new_vnode_watermarks.into_iter())
648 .collect_vec(),
649 );
650 } else {
651 result_epoch_watermark.push((safe_epoch, Arc::from(new_vnode_watermarks)));
652 }
653 }
654 }
655 result_epoch_watermark.reverse();
658 assert!(
659 result_epoch_watermark
660 .is_sorted_by(|(first_epoch, _), (second_epoch, _)| { first_epoch < second_epoch })
661 );
662 *self = TableWatermarks {
663 watermarks: result_epoch_watermark,
664 direction: self.direction,
665 watermark_type: self.watermark_type,
666 }
667 }
668}
669
670impl TableWatermarks {
671 pub fn estimated_encode_len(&self) -> usize {
672 self.watermarks.len() * size_of::<HummockEpoch>()
673 + self
674 .watermarks
675 .iter()
676 .map(|(_, watermarks)| {
677 watermarks
678 .iter()
679 .map(|watermark| watermark.estimated_size())
680 .sum::<usize>()
681 })
682 .sum::<usize>()
683 + size_of::<bool>() }
685
686 pub fn to_protobuf(&self) -> PbTableWatermarks {
687 self.into()
688 }
689}
690
691impl From<&PbTableWatermarks> for TableWatermarks {
692 fn from(pb: &PbTableWatermarks) -> Self {
693 Self {
694 watermarks: pb
695 .epoch_watermarks
696 .iter()
697 .map(|epoch_watermark| {
698 let epoch = epoch_watermark.epoch;
699 let watermarks = epoch_watermark
700 .watermarks
701 .iter()
702 .map(VnodeWatermark::from)
703 .collect();
704 (epoch, watermarks)
705 })
706 .collect(),
707 direction: if pb.is_ascending {
708 WatermarkDirection::Ascending
709 } else {
710 WatermarkDirection::Descending
711 },
712 watermark_type: if pb.is_non_pk_prefix {
713 WatermarkSerdeType::NonPkPrefix
714 } else {
715 WatermarkSerdeType::PkPrefix
716 },
717 }
718 }
719}
720
721impl From<&TableWatermarks> for PbTableWatermarks {
722 fn from(table_watermarks: &TableWatermarks) -> Self {
723 Self {
724 epoch_watermarks: table_watermarks
725 .watermarks
726 .iter()
727 .map(|(epoch, watermarks)| PbEpochNewWatermarks {
728 watermarks: watermarks.iter().map(|wm| wm.into()).collect(),
729 epoch: *epoch,
730 })
731 .collect(),
732 is_ascending: match table_watermarks.direction {
733 WatermarkDirection::Ascending => true,
734 WatermarkDirection::Descending => false,
735 },
736 is_non_pk_prefix: match table_watermarks.watermark_type {
737 WatermarkSerdeType::NonPkPrefix => true,
738 WatermarkSerdeType::PkPrefix => false,
739 },
740 }
741 }
742}
743
744impl From<PbTableWatermarks> for TableWatermarks {
745 fn from(pb: PbTableWatermarks) -> Self {
746 Self {
747 watermarks: pb
748 .epoch_watermarks
749 .into_iter()
750 .map(|epoch_watermark| {
751 let epoch = epoch_watermark.epoch;
752 let watermarks = epoch_watermark
753 .watermarks
754 .into_iter()
755 .map(VnodeWatermark::from)
756 .collect();
757 (epoch, watermarks)
758 })
759 .collect(),
760 direction: if pb.is_ascending {
761 WatermarkDirection::Ascending
762 } else {
763 WatermarkDirection::Descending
764 },
765 watermark_type: if pb.is_non_pk_prefix {
766 WatermarkSerdeType::NonPkPrefix
767 } else {
768 WatermarkSerdeType::PkPrefix
769 },
770 }
771 }
772}
773
774impl From<TableWatermarks> for PbTableWatermarks {
775 fn from(table_watermarks: TableWatermarks) -> Self {
776 Self {
777 epoch_watermarks: table_watermarks
778 .watermarks
779 .into_iter()
780 .map(|(epoch, watermarks)| PbEpochNewWatermarks {
781 watermarks: watermarks.iter().map(PbVnodeWatermark::from).collect(),
782 epoch,
783 })
784 .collect(),
785 is_ascending: match table_watermarks.direction {
786 WatermarkDirection::Ascending => true,
787 WatermarkDirection::Descending => false,
788 },
789 is_non_pk_prefix: match table_watermarks.watermark_type {
790 WatermarkSerdeType::NonPkPrefix => true,
791 WatermarkSerdeType::PkPrefix => false,
792 },
793 }
794 }
795}
796
797#[derive(Debug, Clone, Copy, PartialEq, Eq)]
798pub enum WatermarkSerdeType {
799 PkPrefix,
800 NonPkPrefix,
801}
802
803#[cfg(test)]
804mod tests {
805 use std::collections::Bound::Included;
806 use std::collections::{Bound, HashMap};
807 use std::ops::Bound::Excluded;
808 use std::sync::Arc;
809 use std::vec;
810
811 use bytes::Bytes;
812 use itertools::Itertools;
813 use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
814 use risingwave_common::catalog::TableId;
815 use risingwave_common::hash::VirtualNode;
816 use risingwave_common::util::epoch::{EpochExt, test_epoch};
817 use risingwave_pb::hummock::{PbHummockVersion, StateTableInfo};
818
819 use crate::compaction_group::StaticCompactionGroupId;
820 use crate::key::{TableKeyRange, is_empty_key_range, prefixed_range_with_vnode};
821 use crate::table_watermark::{
822 PkPrefixTableWatermarksIndex, TableWatermarks, VnodeWatermark, WatermarkDirection,
823 WatermarkSerdeType, merge_multiple_new_table_watermarks,
824 };
825 use crate::version::HummockVersion;
826
827 fn build_bitmap(vnodes: impl IntoIterator<Item = usize>) -> Arc<Bitmap> {
828 let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
829 for vnode in vnodes {
830 builder.set(vnode, true);
831 }
832 Arc::new(builder.finish())
833 }
834
835 #[test]
836 fn test_apply_new_table_watermark() {
837 let epoch1 = test_epoch(1);
838 let direction = WatermarkDirection::Ascending;
839 let watermark1 = Bytes::from("watermark1");
840 let watermark2 = Bytes::from("watermark2");
841 let watermark3 = Bytes::from("watermark3");
842 let watermark4 = Bytes::from("watermark4");
843 let watermark_type = WatermarkSerdeType::PkPrefix;
844 let mut table_watermarks = TableWatermarks::single_epoch(
845 epoch1,
846 vec![VnodeWatermark::new(build_bitmap(vec![0, 1, 2]), watermark1)],
847 direction,
848 watermark_type,
849 );
850 let epoch2 = epoch1.next_epoch();
851 table_watermarks.add_new_epoch_watermarks(
852 epoch2,
853 vec![VnodeWatermark::new(
854 build_bitmap(vec![0, 1, 2, 3]),
855 watermark2,
856 )]
857 .into(),
858 direction,
859 watermark_type,
860 );
861
862 let mut table_watermark_checkpoint = table_watermarks.clone();
863
864 let epoch3 = epoch2.next_epoch();
865 let mut second_table_watermark = TableWatermarks::single_epoch(
866 epoch3,
867 vec![VnodeWatermark::new(
868 build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
869 watermark3.clone(),
870 )],
871 direction,
872 watermark_type,
873 );
874 table_watermarks.add_new_epoch_watermarks(
875 epoch3,
876 vec![VnodeWatermark::new(
877 build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
878 watermark3,
879 )]
880 .into(),
881 direction,
882 watermark_type,
883 );
884 let epoch4 = epoch3.next_epoch();
885 let epoch5 = epoch4.next_epoch();
886 table_watermarks.add_new_epoch_watermarks(
887 epoch5,
888 vec![VnodeWatermark::new(
889 build_bitmap(vec![0, 3, 4]),
890 watermark4.clone(),
891 )]
892 .into(),
893 direction,
894 watermark_type,
895 );
896 second_table_watermark.add_new_epoch_watermarks(
897 epoch5,
898 vec![VnodeWatermark::new(build_bitmap(vec![0, 3, 4]), watermark4)].into(),
899 direction,
900 watermark_type,
901 );
902
903 table_watermark_checkpoint.apply_new_table_watermarks(&second_table_watermark);
904 assert_eq!(table_watermarks, table_watermark_checkpoint);
905 }
906
907 #[test]
908 fn test_clear_stale_epoch_watmermark() {
909 let epoch1 = test_epoch(1);
910 let direction = WatermarkDirection::Ascending;
911 let watermark1 = Bytes::from("watermark1");
912 let watermark2 = Bytes::from("watermark2");
913 let watermark3 = Bytes::from("watermark3");
914 let watermark4 = Bytes::from("watermark4");
915 let watermark_type = WatermarkSerdeType::PkPrefix;
916 let mut table_watermarks = TableWatermarks::single_epoch(
917 epoch1,
918 vec![VnodeWatermark::new(build_bitmap(vec![0, 1, 2]), watermark1)],
919 direction,
920 watermark_type,
921 );
922 let epoch2 = epoch1.next_epoch();
923 table_watermarks.add_new_epoch_watermarks(
924 epoch2,
925 vec![VnodeWatermark::new(
926 build_bitmap(vec![0, 1, 2, 3]),
927 watermark2.clone(),
928 )]
929 .into(),
930 direction,
931 watermark_type,
932 );
933 let epoch3 = epoch2.next_epoch();
934 table_watermarks.add_new_epoch_watermarks(
935 epoch3,
936 vec![VnodeWatermark::new(
937 build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
938 watermark3.clone(),
939 )]
940 .into(),
941 direction,
942 watermark_type,
943 );
944 let epoch4 = epoch3.next_epoch();
945 let epoch5 = epoch4.next_epoch();
946 table_watermarks.add_new_epoch_watermarks(
947 epoch5,
948 vec![VnodeWatermark::new(
949 build_bitmap(vec![0, 3, 4]),
950 watermark4.clone(),
951 )]
952 .into(),
953 direction,
954 watermark_type,
955 );
956
957 let mut table_watermarks_checkpoint = table_watermarks.clone();
958 table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch1);
959 assert_eq!(table_watermarks_checkpoint, table_watermarks);
960
961 table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch2);
962 assert_eq!(
963 table_watermarks_checkpoint,
964 TableWatermarks {
965 watermarks: vec![
966 (
967 epoch2,
968 vec![VnodeWatermark::new(
969 build_bitmap(vec![0, 1, 2, 3]),
970 watermark2,
971 )]
972 .into()
973 ),
974 (
975 epoch3,
976 vec![VnodeWatermark::new(
977 build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
978 watermark3.clone(),
979 )]
980 .into()
981 ),
982 (
983 epoch5,
984 vec![VnodeWatermark::new(
985 build_bitmap(vec![0, 3, 4]),
986 watermark4.clone(),
987 )]
988 .into()
989 )
990 ],
991 direction,
992 watermark_type,
993 }
994 );
995
996 table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch3);
997 assert_eq!(
998 table_watermarks_checkpoint,
999 TableWatermarks {
1000 watermarks: vec![
1001 (
1002 epoch3,
1003 vec![VnodeWatermark::new(
1004 build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
1005 watermark3.clone(),
1006 )]
1007 .into()
1008 ),
1009 (
1010 epoch5,
1011 vec![VnodeWatermark::new(
1012 build_bitmap(vec![0, 3, 4]),
1013 watermark4.clone(),
1014 )]
1015 .into()
1016 )
1017 ],
1018 direction,
1019 watermark_type,
1020 }
1021 );
1022
1023 table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch4);
1024 assert_eq!(
1025 table_watermarks_checkpoint,
1026 TableWatermarks {
1027 watermarks: vec![
1028 (
1029 epoch4,
1030 vec![VnodeWatermark::new(
1031 build_bitmap((1..3).chain(5..VirtualNode::COUNT_FOR_TEST)),
1032 watermark3.clone()
1033 )]
1034 .into()
1035 ),
1036 (
1037 epoch5,
1038 vec![VnodeWatermark::new(
1039 build_bitmap(vec![0, 3, 4]),
1040 watermark4.clone(),
1041 )]
1042 .into()
1043 )
1044 ],
1045 direction,
1046 watermark_type,
1047 }
1048 );
1049
1050 table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch5);
1051 assert_eq!(
1052 table_watermarks_checkpoint,
1053 TableWatermarks {
1054 watermarks: vec![(
1055 epoch5,
1056 vec![
1057 VnodeWatermark::new(build_bitmap(vec![0, 3, 4]), watermark4),
1058 VnodeWatermark::new(
1059 build_bitmap((1..3).chain(5..VirtualNode::COUNT_FOR_TEST)),
1060 watermark3
1061 )
1062 ]
1063 .into()
1064 )],
1065 direction,
1066 watermark_type,
1067 }
1068 );
1069 }
1070
1071 #[test]
1072 fn test_merge_multiple_new_table_watermarks() {
1073 fn epoch_new_watermark(epoch: u64, bitmaps: Vec<&Bitmap>) -> (u64, Arc<[VnodeWatermark]>) {
1074 (
1075 epoch,
1076 bitmaps
1077 .into_iter()
1078 .map(|bitmap| VnodeWatermark {
1079 watermark: Bytes::from(vec![1, 2, epoch as _]),
1080 vnode_bitmap: Arc::new(bitmap.clone()),
1081 })
1082 .collect_vec()
1083 .into(),
1084 )
1085 }
1086 fn build_table_watermark(
1087 vnodes: impl IntoIterator<Item = usize>,
1088 epochs: impl IntoIterator<Item = u64>,
1089 ) -> TableWatermarks {
1090 let bitmap = build_bitmap(vnodes);
1091 TableWatermarks {
1092 watermarks: epochs
1093 .into_iter()
1094 .map(|epoch: u64| epoch_new_watermark(epoch, vec![&bitmap]))
1095 .collect(),
1096 direction: WatermarkDirection::Ascending,
1097 watermark_type: WatermarkSerdeType::PkPrefix,
1098 }
1099 }
1100 let table1_watermark1 = build_table_watermark(0..3, vec![1, 2, 4]);
1101 let table1_watermark2 = build_table_watermark(4..6, vec![1, 2, 5]);
1102 let table2_watermark = build_table_watermark(0..4, 1..3);
1103 let table3_watermark = build_table_watermark(0..4, 3..5);
1104 let mut first = HashMap::new();
1105 first.insert(TableId::new(1), table1_watermark1);
1106 first.insert(TableId::new(2), table2_watermark.clone());
1107 let mut second = HashMap::new();
1108 second.insert(TableId::new(1), table1_watermark2);
1109 second.insert(TableId::new(3), table3_watermark.clone());
1110 let result = merge_multiple_new_table_watermarks(vec![first, second]);
1111 let mut expected = HashMap::new();
1112 expected.insert(
1113 TableId::new(1),
1114 TableWatermarks {
1115 watermarks: vec![
1116 epoch_new_watermark(1, vec![&build_bitmap(0..3), &build_bitmap(4..6)]),
1117 epoch_new_watermark(2, vec![&build_bitmap(0..3), &build_bitmap(4..6)]),
1118 epoch_new_watermark(4, vec![&build_bitmap(0..3)]),
1119 epoch_new_watermark(5, vec![&build_bitmap(4..6)]),
1120 ],
1121 direction: WatermarkDirection::Ascending,
1122 watermark_type: WatermarkSerdeType::PkPrefix,
1123 },
1124 );
1125 expected.insert(TableId::new(2), table2_watermark);
1126 expected.insert(TableId::new(3), table3_watermark);
1127 assert_eq!(result, expected);
1128 }
1129
1130 const COMMITTED_EPOCH: u64 = test_epoch(1);
1131 const EPOCH1: u64 = test_epoch(2);
1132 const EPOCH2: u64 = test_epoch(3);
1133 const TEST_SINGLE_VNODE: VirtualNode = VirtualNode::from_index(1);
1134
1135 fn build_watermark_range(
1136 direction: WatermarkDirection,
1137 (low, high): (Bound<Bytes>, Bound<Bytes>),
1138 ) -> TableKeyRange {
1139 let range = match direction {
1140 WatermarkDirection::Ascending => (low, high),
1141 WatermarkDirection::Descending => (high, low),
1142 };
1143 prefixed_range_with_vnode(range, TEST_SINGLE_VNODE)
1144 }
1145
1146 fn build_and_test_watermark_index(
1150 direction: WatermarkDirection,
1151 watermark1: Bytes,
1152 watermark2: Bytes,
1153 watermark3: Bytes,
1154 ) -> PkPrefixTableWatermarksIndex {
1155 let mut index = PkPrefixTableWatermarksIndex::new(
1156 direction,
1157 EPOCH1,
1158 vec![VnodeWatermark::new(build_bitmap(0..4), watermark1.clone())],
1159 Some(COMMITTED_EPOCH),
1160 );
1161 index.add_epoch_watermark(
1162 EPOCH2,
1163 vec![VnodeWatermark::new(build_bitmap(1..5), watermark2.clone())].into(),
1164 direction,
1165 );
1166
1167 assert_eq!(
1168 index.read_watermark(VirtualNode::from_index(0), EPOCH1),
1169 Some(watermark1.clone())
1170 );
1171 assert_eq!(
1172 index.read_watermark(VirtualNode::from_index(1), EPOCH1),
1173 Some(watermark1.clone())
1174 );
1175 assert_eq!(
1176 index.read_watermark(VirtualNode::from_index(4), EPOCH1),
1177 None
1178 );
1179 assert_eq!(
1180 index.read_watermark(VirtualNode::from_index(0), EPOCH2),
1181 Some(watermark1.clone())
1182 );
1183 assert_eq!(
1184 index.read_watermark(VirtualNode::from_index(1), EPOCH2),
1185 Some(watermark2.clone())
1186 );
1187 assert_eq!(
1188 index.read_watermark(VirtualNode::from_index(4), EPOCH2),
1189 Some(watermark2.clone())
1190 );
1191 assert_eq!(
1192 index.latest_watermark(VirtualNode::from_index(0)),
1193 Some(watermark1.clone())
1194 );
1195 assert_eq!(
1196 index.latest_watermark(VirtualNode::from_index(1)),
1197 Some(watermark2.clone())
1198 );
1199 assert_eq!(
1200 index.latest_watermark(VirtualNode::from_index(4)),
1201 Some(watermark2.clone())
1202 );
1203
1204 let check_watermark_range =
1206 |query_range: (Bound<Bytes>, Bound<Bytes>),
1207 output_range: Option<(Bound<Bytes>, Bound<Bytes>)>| {
1208 let mut range = build_watermark_range(direction, query_range);
1209 index.rewrite_range_with_table_watermark(EPOCH2, &mut range);
1210 if let Some(output_range) = output_range {
1211 assert_eq!(range, build_watermark_range(direction, output_range));
1212 } else {
1213 assert!(is_empty_key_range(&range));
1214 }
1215 };
1216
1217 check_watermark_range(
1219 (Included(watermark1.clone()), Excluded(watermark3.clone())),
1220 Some((Included(watermark2.clone()), Excluded(watermark3.clone()))),
1221 );
1222
1223 check_watermark_range(
1225 (Included(watermark2.clone()), Excluded(watermark3.clone())),
1226 Some((Included(watermark2.clone()), Excluded(watermark3.clone()))),
1227 );
1228 check_watermark_range(
1229 (Excluded(watermark2.clone()), Excluded(watermark3.clone())),
1230 Some((Excluded(watermark2.clone()), Excluded(watermark3))),
1231 );
1232
1233 check_watermark_range(
1235 (Excluded(watermark1.clone()), Excluded(watermark2.clone())),
1236 None,
1237 );
1238 check_watermark_range(
1239 (Excluded(watermark1), Included(watermark2.clone())),
1240 Some((Included(watermark2.clone()), Included(watermark2))),
1241 );
1242
1243 index
1244 }
1245
1246 #[test]
1247 fn test_watermark_index_ascending() {
1248 let watermark1 = Bytes::from_static(b"watermark1");
1249 let watermark2 = Bytes::from_static(b"watermark2");
1250 let watermark3 = Bytes::from_static(b"watermark3");
1251 build_and_test_watermark_index(
1252 WatermarkDirection::Ascending,
1253 watermark1,
1254 watermark2,
1255 watermark3,
1256 );
1257 }
1258
1259 #[test]
1260 fn test_watermark_index_descending() {
1261 let watermark1 = Bytes::from_static(b"watermark254");
1262 let watermark2 = Bytes::from_static(b"watermark253");
1263 let watermark3 = Bytes::from_static(b"watermark252");
1264 build_and_test_watermark_index(
1265 WatermarkDirection::Descending,
1266 watermark1,
1267 watermark2,
1268 watermark3,
1269 );
1270 }
1271
1272 #[test]
1273 fn test_apply_committed_index() {
1274 let watermark1 = Bytes::from_static(b"watermark1");
1275 let watermark2 = Bytes::from_static(b"watermark2");
1276 let watermark3 = Bytes::from_static(b"watermark3");
1277 let mut index = build_and_test_watermark_index(
1278 WatermarkDirection::Ascending,
1279 watermark1.clone(),
1280 watermark2.clone(),
1281 watermark3,
1282 );
1283
1284 let test_table_id = TableId::from(233);
1285
1286 let mut version = HummockVersion::from_rpc_protobuf(&PbHummockVersion {
1287 state_table_info: HashMap::from_iter([(
1288 test_table_id.table_id,
1289 StateTableInfo {
1290 committed_epoch: EPOCH1,
1291 compaction_group_id: StaticCompactionGroupId::StateDefault as _,
1292 },
1293 )]),
1294 ..Default::default()
1295 });
1296 version.table_watermarks.insert(
1297 test_table_id,
1298 TableWatermarks {
1299 watermarks: vec![(
1300 EPOCH1,
1301 vec![VnodeWatermark {
1302 watermark: watermark1.clone(),
1303 vnode_bitmap: build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
1304 }]
1305 .into(),
1306 )],
1307 direction: WatermarkDirection::Ascending,
1308 watermark_type: WatermarkSerdeType::PkPrefix,
1309 }
1310 .into(),
1311 );
1312 index.apply_committed_watermarks(
1313 version
1314 .table_watermarks
1315 .get(&test_table_id)
1316 .unwrap()
1317 .clone(),
1318 EPOCH1,
1319 );
1320 assert_eq!(EPOCH1, index.committed_epoch.unwrap());
1321 assert_eq!(EPOCH2, index.latest_epoch);
1322 for vnode in 0..VirtualNode::COUNT_FOR_TEST {
1323 let vnode = VirtualNode::from_index(vnode);
1324 if (1..5).contains(&vnode.to_index()) {
1325 assert_eq!(watermark1, index.read_watermark(vnode, EPOCH1).unwrap());
1326 assert_eq!(watermark2, index.read_watermark(vnode, EPOCH2).unwrap());
1327 } else {
1328 assert_eq!(watermark1, index.read_watermark(vnode, EPOCH1).unwrap());
1329 }
1330 }
1331 }
1332
1333 #[test]
1334 fn test_filter_regress_watermark() {
1335 let watermark1 = Bytes::from_static(b"watermark1");
1336 let watermark2 = Bytes::from_static(b"watermark2");
1337 let watermark3 = Bytes::from_static(b"watermark3");
1338 let index = build_and_test_watermark_index(
1339 WatermarkDirection::Ascending,
1340 watermark1.clone(),
1341 watermark2,
1342 watermark3.clone(),
1343 );
1344
1345 let mut new_watermarks = vec![
1346 VnodeWatermark {
1348 vnode_bitmap: build_bitmap(0..2),
1349 watermark: watermark1.clone(),
1350 },
1351 VnodeWatermark {
1353 vnode_bitmap: build_bitmap(2..4),
1354 watermark: watermark3.clone(),
1355 },
1356 VnodeWatermark {
1358 vnode_bitmap: build_bitmap(4..5),
1359 watermark: watermark1.clone(),
1360 },
1361 VnodeWatermark {
1363 vnode_bitmap: build_bitmap(5..6),
1364 watermark: watermark3.clone(),
1365 },
1366 ];
1367
1368 index.filter_regress_watermarks(&mut new_watermarks);
1369
1370 assert_eq!(
1371 new_watermarks,
1372 vec![
1373 VnodeWatermark {
1374 vnode_bitmap: build_bitmap(0..1),
1375 watermark: watermark1,
1376 },
1377 VnodeWatermark {
1378 vnode_bitmap: build_bitmap(2..4),
1379 watermark: watermark3.clone(),
1380 },
1381 VnodeWatermark {
1382 vnode_bitmap: build_bitmap(5..6),
1383 watermark: watermark3,
1384 },
1385 ]
1386 );
1387 }
1388}