1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
17use std::fmt::Display;
18use std::mem::size_of;
19use std::ops::Bound::{Excluded, Included, Unbounded};
20use std::sync::Arc;
21
22use bytes::Bytes;
23use itertools::Itertools;
24use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
25use risingwave_common::catalog::TableId;
26use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
27use risingwave_common::types::ToDatumRef;
28use risingwave_common::util::sort_util::{OrderType, cmp_datum};
29use risingwave_common_estimate_size::EstimateSize;
30use risingwave_pb::hummock::table_watermarks::PbEpochNewWatermarks;
31use risingwave_pb::hummock::{PbVnodeWatermark, TableWatermarks as PbTableWatermarks};
32use tracing::{debug, warn};
33
34use crate::HummockEpoch;
35use crate::key::{TableKey, TableKeyRange, prefix_slice_with_vnode, vnode};
36
37#[derive(Clone)]
38pub struct ReadTableWatermark {
39 pub direction: WatermarkDirection,
40 pub vnode_watermarks: BTreeMap<VirtualNode, Bytes>,
41}
42
43#[derive(Clone)]
44pub struct TableWatermarksIndex {
45 pub watermark_direction: WatermarkDirection,
46 pub staging_watermarks: VecDeque<(HummockEpoch, Arc<[VnodeWatermark]>)>,
48 pub committed_watermarks: Option<Arc<TableWatermarks>>,
49 latest_epoch: HummockEpoch,
50 committed_epoch: Option<HummockEpoch>,
51}
52
53impl TableWatermarksIndex {
54 pub fn new(
55 watermark_direction: WatermarkDirection,
56 first_epoch: HummockEpoch,
57 first_vnode_watermark: Vec<VnodeWatermark>,
58 committed_epoch: Option<HummockEpoch>,
59 ) -> Self {
60 if let Some(committed_epoch) = committed_epoch {
61 assert!(first_epoch > committed_epoch);
62 }
63 Self {
64 watermark_direction,
65 staging_watermarks: VecDeque::from_iter([(
66 first_epoch,
67 Arc::from(first_vnode_watermark),
68 )]),
69 committed_watermarks: None,
70 latest_epoch: first_epoch,
71 committed_epoch,
72 }
73 }
74
75 pub fn new_committed(
76 committed_watermarks: Arc<TableWatermarks>,
77 committed_epoch: HummockEpoch,
78 ) -> Self {
79 Self {
80 watermark_direction: committed_watermarks.direction,
81 staging_watermarks: VecDeque::new(),
82 committed_epoch: Some(committed_epoch),
83 latest_epoch: committed_epoch,
84 committed_watermarks: Some(committed_watermarks),
85 }
86 }
87
88 pub fn read_watermark(&self, vnode: VirtualNode, epoch: HummockEpoch) -> Option<Bytes> {
89 for (watermark_epoch, vnode_watermark_list) in self.staging_watermarks.iter().rev().chain(
91 self.committed_watermarks
92 .iter()
93 .flat_map(|watermarks| watermarks.watermarks.iter().rev()),
94 ) {
95 if *watermark_epoch > epoch {
96 continue;
97 }
98 for vnode_watermark in vnode_watermark_list.as_ref() {
99 if vnode_watermark.vnode_bitmap.is_set(vnode.to_index()) {
100 return Some(vnode_watermark.watermark.clone());
101 }
102 }
103 }
104 None
105 }
106
107 pub fn latest_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
108 self.read_watermark(vnode, HummockEpoch::MAX)
109 }
110
111 pub fn rewrite_range_with_table_watermark(
112 &self,
113 epoch: HummockEpoch,
114 key_range: &mut TableKeyRange,
115 ) {
116 let vnode = vnode(key_range);
117 if let Some(watermark) = self.read_watermark(vnode, epoch) {
118 match self.watermark_direction {
119 WatermarkDirection::Ascending => {
120 let overwrite_start_key = match &key_range.0 {
121 Included(start_key) | Excluded(start_key) => {
122 start_key.key_part() < watermark
123 }
124 Unbounded => true,
125 };
126 if overwrite_start_key {
127 let watermark_key = TableKey(prefix_slice_with_vnode(vnode, &watermark));
128 let fully_filtered = match &key_range.1 {
129 Included(end_key) => end_key < &watermark_key,
130 Excluded(end_key) => end_key <= &watermark_key,
131 Unbounded => false,
132 };
133 if fully_filtered {
134 key_range.1 = Excluded(watermark_key.clone());
135 }
136 key_range.0 = Included(watermark_key);
137 }
138 }
139 WatermarkDirection::Descending => {
140 let overwrite_end_key = match &key_range.1 {
141 Included(end_key) | Excluded(end_key) => end_key.key_part() > watermark,
142 Unbounded => true,
143 };
144 if overwrite_end_key {
145 let watermark_key = TableKey(prefix_slice_with_vnode(vnode, &watermark));
146 let fully_filtered = match &key_range.0 {
147 Included(start_key) => start_key > &watermark_key,
148 Excluded(start_key) => start_key >= &watermark_key,
149 Unbounded => false,
150 };
151 if fully_filtered {
152 *key_range = (Included(watermark_key.clone()), Excluded(watermark_key));
153 } else {
154 key_range.1 = Included(watermark_key);
155 }
156 }
157 }
158 }
159 }
160 }
161
162 pub fn filter_regress_watermarks(&self, watermarks: &mut Vec<VnodeWatermark>) {
163 let mut ret = Vec::with_capacity(watermarks.len());
164 for watermark in watermarks.drain(..) {
165 let vnode_count = watermark.vnode_count();
166
167 let mut regress_vnodes = None;
168 for vnode in watermark.vnode_bitmap.iter_vnodes() {
169 if let Some(prev_watermark) = self.latest_watermark(vnode) {
170 let is_regress = match self.direction() {
171 WatermarkDirection::Ascending => prev_watermark > watermark.watermark,
172 WatermarkDirection::Descending => prev_watermark < watermark.watermark,
173 };
174 if is_regress {
175 warn!(
176 "table watermark regress: {:?} {} {:?} {:?}",
177 self.direction(),
178 vnode,
179 watermark.watermark,
180 prev_watermark
181 );
182 regress_vnodes
183 .get_or_insert_with(|| BitmapBuilder::zeroed(vnode_count))
184 .set(vnode.to_index(), true);
185 }
186 }
187 }
188 if let Some(regress_vnodes) = regress_vnodes {
189 let mut bitmap_builder = None;
190 for vnode in watermark.vnode_bitmap.iter_vnodes() {
191 let vnode_index = vnode.to_index();
192 if !regress_vnodes.is_set(vnode_index) {
193 bitmap_builder
194 .get_or_insert_with(|| BitmapBuilder::zeroed(vnode_count))
195 .set(vnode_index, true);
196 }
197 }
198 if let Some(bitmap_builder) = bitmap_builder {
199 ret.push(VnodeWatermark::new(
200 Arc::new(bitmap_builder.finish()),
201 watermark.watermark,
202 ));
203 }
204 } else {
205 ret.push(watermark);
207 }
208 }
209 *watermarks = ret;
210 }
211
212 pub fn direction(&self) -> WatermarkDirection {
213 self.watermark_direction
214 }
215
216 pub fn add_epoch_watermark(
217 &mut self,
218 epoch: HummockEpoch,
219 vnode_watermark_list: Arc<[VnodeWatermark]>,
220 direction: WatermarkDirection,
221 ) {
222 assert!(epoch > self.latest_epoch);
223 assert_eq!(self.watermark_direction, direction);
224 self.latest_epoch = epoch;
225 #[cfg(debug_assertions)]
226 if !vnode_watermark_list.is_empty() {
227 let vnode_count = vnode_watermark_list[0].vnode_count();
228 let mut vnode_is_set = BitmapBuilder::zeroed(vnode_count);
229 for vnode_watermark in vnode_watermark_list.as_ref() {
230 for vnode in vnode_watermark.vnode_bitmap.iter_ones() {
231 assert!(!vnode_is_set.is_set(vnode));
232 vnode_is_set.set(vnode, true);
233 let vnode = VirtualNode::from_index(vnode);
234 if let Some(prev_watermark) = self.latest_watermark(vnode) {
235 match self.watermark_direction {
236 WatermarkDirection::Ascending => {
237 assert!(vnode_watermark.watermark >= prev_watermark);
238 }
239 WatermarkDirection::Descending => {
240 assert!(vnode_watermark.watermark <= prev_watermark);
241 }
242 };
243 }
244 }
245 }
246 }
247 self.staging_watermarks
248 .push_back((epoch, vnode_watermark_list));
249 }
250
251 pub fn apply_committed_watermarks(
252 &mut self,
253 committed_watermark: Arc<TableWatermarks>,
254 committed_epoch: HummockEpoch,
255 ) {
256 assert_eq!(self.watermark_direction, committed_watermark.direction);
257 if let Some(prev_committed_epoch) = self.committed_epoch {
258 assert!(prev_committed_epoch <= committed_epoch);
259 if prev_committed_epoch == committed_epoch {
260 return;
261 }
262 }
263 if self.latest_epoch < committed_epoch {
264 debug!(
265 latest_epoch = self.latest_epoch,
266 committed_epoch, "committed_epoch exceed table watermark latest_epoch"
267 );
268 self.latest_epoch = committed_epoch;
269 }
270 self.committed_epoch = Some(committed_epoch);
271 self.committed_watermarks = Some(committed_watermark);
272 while let Some((old_epoch, _)) = self.staging_watermarks.front()
274 && *old_epoch <= committed_epoch
275 {
276 let _ = self.staging_watermarks.pop_front();
277 }
278 }
279}
280
281#[derive(Debug, Clone, Copy, Eq, PartialEq)]
282pub enum WatermarkDirection {
283 Ascending,
284 Descending,
285}
286
287impl Display for WatermarkDirection {
288 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
289 match self {
290 WatermarkDirection::Ascending => write!(f, "Ascending"),
291 WatermarkDirection::Descending => write!(f, "Descending"),
292 }
293 }
294}
295
296impl WatermarkDirection {
297 pub fn key_filter_by_watermark(
298 &self,
299 key: impl AsRef<[u8]>,
300 watermark: impl AsRef<[u8]>,
301 ) -> bool {
302 let key = key.as_ref();
303 let watermark = watermark.as_ref();
304 match self {
305 WatermarkDirection::Ascending => key < watermark,
306 WatermarkDirection::Descending => key > watermark,
307 }
308 }
309
310 pub fn datum_filter_by_watermark(
311 &self,
312 watermark_col_in_pk: impl ToDatumRef,
313 watermark: impl ToDatumRef,
314 order_type: OrderType,
315 ) -> bool {
316 let watermark_col_in_pk = watermark_col_in_pk.to_datum_ref();
317 let watermark = watermark.to_datum_ref();
318 match self {
319 WatermarkDirection::Ascending => {
320 cmp_datum(watermark_col_in_pk, watermark, order_type).is_lt()
322 }
323 WatermarkDirection::Descending => {
324 cmp_datum(watermark_col_in_pk, watermark, order_type).is_gt()
326 }
327 }
328 }
329
330 pub fn is_ascending(&self) -> bool {
331 match self {
332 WatermarkDirection::Ascending => true,
333 WatermarkDirection::Descending => false,
334 }
335 }
336}
337
338#[derive(Clone, Debug, PartialEq, EstimateSize)]
339pub struct VnodeWatermark {
340 vnode_bitmap: Arc<Bitmap>,
341 watermark: Bytes,
342}
343
344impl VnodeWatermark {
345 pub fn new(vnode_bitmap: Arc<Bitmap>, watermark: Bytes) -> Self {
346 Self {
347 vnode_bitmap,
348 watermark,
349 }
350 }
351
352 pub fn vnode_bitmap(&self) -> &Bitmap {
353 &self.vnode_bitmap
354 }
355
356 pub fn vnode_count(&self) -> usize {
358 self.vnode_bitmap.len()
359 }
360
361 pub fn watermark(&self) -> &Bytes {
362 &self.watermark
363 }
364
365 pub fn to_protobuf(&self) -> PbVnodeWatermark {
366 self.into()
367 }
368}
369
370impl From<PbVnodeWatermark> for VnodeWatermark {
371 fn from(pb: PbVnodeWatermark) -> Self {
372 Self {
373 vnode_bitmap: Arc::new(Bitmap::from(pb.vnode_bitmap.as_ref().unwrap())),
374 watermark: Bytes::from(pb.watermark),
375 }
376 }
377}
378
379impl From<&PbVnodeWatermark> for VnodeWatermark {
380 fn from(pb: &PbVnodeWatermark) -> Self {
381 Self {
382 vnode_bitmap: Arc::new(Bitmap::from(pb.vnode_bitmap.as_ref().unwrap())),
383 watermark: Bytes::from(pb.watermark.clone()),
384 }
385 }
386}
387
388impl From<VnodeWatermark> for PbVnodeWatermark {
389 fn from(watermark: VnodeWatermark) -> Self {
390 Self {
391 watermark: watermark.watermark.into(),
392 vnode_bitmap: Some(watermark.vnode_bitmap.to_protobuf()),
393 }
394 }
395}
396
397impl From<&VnodeWatermark> for PbVnodeWatermark {
398 fn from(watermark: &VnodeWatermark) -> Self {
399 Self {
400 watermark: watermark.watermark.to_vec(),
401 vnode_bitmap: Some(watermark.vnode_bitmap.to_protobuf()),
402 }
403 }
404}
405
406#[derive(Clone, Debug, PartialEq)]
407pub struct TableWatermarks {
408 pub watermarks: Vec<(HummockEpoch, Arc<[VnodeWatermark]>)>,
410 pub direction: WatermarkDirection,
411 pub watermark_type: WatermarkSerdeType,
412}
413
414impl TableWatermarks {
415 pub fn single_epoch(
416 epoch: HummockEpoch,
417 watermarks: Vec<VnodeWatermark>,
418 direction: WatermarkDirection,
419 watermark_type: WatermarkSerdeType,
420 ) -> Self {
421 let mut this = Self {
422 direction,
423 watermarks: Vec::new(),
424 watermark_type,
425 };
426 this.add_new_epoch_watermarks(epoch, watermarks.into(), direction, watermark_type);
427 this
428 }
429
430 pub fn add_new_epoch_watermarks(
431 &mut self,
432 epoch: HummockEpoch,
433 watermarks: Arc<[VnodeWatermark]>,
434 direction: WatermarkDirection,
435 watermark_type: WatermarkSerdeType,
436 ) {
437 assert_eq!(self.direction, direction);
438 assert_eq!(self.watermark_type, watermark_type);
439
440 if let Some((prev_epoch, _)) = self.watermarks.last() {
441 assert!(*prev_epoch < epoch);
442 }
443 if !watermarks.is_empty() {
444 let vnode_count = watermarks[0].vnode_count();
445 for watermark in &*watermarks {
446 assert_eq!(watermark.vnode_count(), vnode_count);
447 }
448 if let Some(existing_vnode_count) = self.vnode_count() {
449 assert_eq!(existing_vnode_count, vnode_count);
450 }
451 }
452 self.watermarks.push((epoch, watermarks));
453 }
454
455 fn vnode_count(&self) -> Option<usize> {
457 self.watermarks
458 .iter()
459 .flat_map(|(_, watermarks)| watermarks.as_ref())
460 .next()
461 .map(|w| w.vnode_count())
462 }
463
464 pub fn from_protobuf(pb: &PbTableWatermarks) -> Self {
465 Self {
466 watermarks: pb
467 .epoch_watermarks
468 .iter()
469 .map(|epoch_watermark| {
470 let epoch = epoch_watermark.epoch;
471 let watermarks = epoch_watermark
472 .watermarks
473 .iter()
474 .map(VnodeWatermark::from)
475 .collect_vec();
476 (epoch, Arc::from(watermarks))
477 })
478 .collect(),
479 direction: if pb.is_ascending {
480 WatermarkDirection::Ascending
481 } else {
482 WatermarkDirection::Descending
483 },
484 watermark_type: if pb.is_non_pk_prefix {
485 WatermarkSerdeType::NonPkPrefix
486 } else {
487 WatermarkSerdeType::PkPrefix
488 },
489 }
490 }
491}
492
493pub fn merge_multiple_new_table_watermarks(
494 table_watermarks_list: impl IntoIterator<Item = HashMap<TableId, TableWatermarks>>,
495) -> HashMap<TableId, TableWatermarks> {
496 #[allow(clippy::type_complexity)]
497 let mut ret: HashMap<
498 TableId,
499 (
500 WatermarkDirection,
501 BTreeMap<u64, Vec<VnodeWatermark>>,
502 WatermarkSerdeType,
503 ),
504 > = HashMap::new();
505 for table_watermarks in table_watermarks_list {
506 for (table_id, new_table_watermarks) in table_watermarks {
507 let epoch_watermarks = match ret.entry(table_id) {
508 Entry::Occupied(entry) => {
509 let (direction, epoch_watermarks, watermark_type) = entry.into_mut();
510 assert_eq!(&new_table_watermarks.direction, direction);
511 assert_eq!(&new_table_watermarks.watermark_type, watermark_type);
512 epoch_watermarks
513 }
514 Entry::Vacant(entry) => {
515 let (_, epoch_watermarks, _) = entry.insert((
516 new_table_watermarks.direction,
517 BTreeMap::new(),
518 new_table_watermarks.watermark_type,
519 ));
520 epoch_watermarks
521 }
522 };
523 for (new_epoch, new_epoch_watermarks) in new_table_watermarks.watermarks {
524 epoch_watermarks
525 .entry(new_epoch)
526 .or_insert_with(Vec::new)
527 .extend(new_epoch_watermarks.iter().cloned());
528 }
529 }
530 }
531 ret.into_iter()
532 .map(
533 |(table_id, (direction, epoch_watermarks, watermark_type))| {
534 (
535 table_id,
536 TableWatermarks {
537 direction,
538 watermarks: epoch_watermarks
540 .into_iter()
541 .map(|(epoch, watermarks)| (epoch, Arc::from(watermarks)))
542 .collect(),
543 watermark_type,
544 },
545 )
546 },
547 )
548 .collect()
549}
550
551impl TableWatermarks {
552 pub fn apply_new_table_watermarks(&mut self, newly_added_watermarks: &TableWatermarks) {
553 assert_eq!(self.direction, newly_added_watermarks.direction);
554 assert!(self.watermarks.iter().map(|(epoch, _)| epoch).is_sorted());
555 assert!(
556 newly_added_watermarks
557 .watermarks
558 .iter()
559 .map(|(epoch, _)| epoch)
560 .is_sorted()
561 );
562 if let Some((prev_last_epoch, _)) = self.watermarks.last()
564 && let Some((new_first_epoch, _)) = newly_added_watermarks.watermarks.first()
565 {
566 assert!(prev_last_epoch < new_first_epoch);
567 }
568 self.watermarks.extend(
569 newly_added_watermarks
570 .watermarks
571 .iter()
572 .map(|(epoch, new_watermarks)| (*epoch, new_watermarks.clone())),
573 );
574 }
575
576 pub fn clear_stale_epoch_watermark(&mut self, safe_epoch: u64) {
577 match self.watermarks.first() {
578 None => {
579 return;
581 }
582 Some((earliest_epoch, _)) => {
583 if *earliest_epoch >= safe_epoch {
584 return;
586 }
587 }
588 }
589 debug!("clear stale table watermark below epoch {}", safe_epoch);
590 let mut result_epoch_watermark = Vec::with_capacity(self.watermarks.len());
591 let mut set_vnode: HashSet<VirtualNode> = HashSet::new();
592 let mut vnode_count: Option<usize> = None; while let Some((epoch, _)) = self.watermarks.last() {
594 if *epoch >= safe_epoch {
595 let (epoch, watermarks) = self.watermarks.pop().expect("have check Some");
596 for watermark in watermarks.as_ref() {
597 vnode_count.get_or_insert_with(|| watermark.vnode_count());
598 for vnode in watermark.vnode_bitmap.iter_vnodes() {
599 set_vnode.insert(vnode);
600 }
601 }
602 result_epoch_watermark.push((epoch, watermarks));
603 } else {
604 break;
605 }
606 }
607 while vnode_count != Some(set_vnode.len())
608 && let Some((_, watermarks)) = self.watermarks.pop()
609 {
610 let mut new_vnode_watermarks = Vec::new();
611 for vnode_watermark in watermarks.as_ref() {
612 let mut new_set_vnode = Vec::new();
613 vnode_count.get_or_insert_with(|| vnode_watermark.vnode_count());
614 for vnode in vnode_watermark.vnode_bitmap.iter_vnodes() {
615 if set_vnode.insert(vnode) {
616 new_set_vnode.push(vnode);
617 }
618 }
619 if !new_set_vnode.is_empty() {
620 let mut builder = BitmapBuilder::zeroed(vnode_watermark.vnode_count());
621 for vnode in new_set_vnode {
622 builder.set(vnode.to_index(), true);
623 }
624 let bitmap = Arc::new(builder.finish());
625 new_vnode_watermarks.push(VnodeWatermark {
626 vnode_bitmap: bitmap,
627 watermark: vnode_watermark.watermark.clone(),
628 })
629 }
630 }
631 if !new_vnode_watermarks.is_empty() {
632 if let Some((last_epoch, last_watermarks)) = result_epoch_watermark.last_mut()
633 && *last_epoch == safe_epoch
634 {
635 *last_watermarks = Arc::from(
636 last_watermarks
637 .iter()
638 .cloned()
639 .chain(new_vnode_watermarks.into_iter())
640 .collect_vec(),
641 );
642 } else {
643 result_epoch_watermark.push((safe_epoch, Arc::from(new_vnode_watermarks)));
644 }
645 }
646 }
647 result_epoch_watermark.reverse();
650 assert!(
651 result_epoch_watermark
652 .is_sorted_by(|(first_epoch, _), (second_epoch, _)| { first_epoch < second_epoch })
653 );
654 *self = TableWatermarks {
655 watermarks: result_epoch_watermark,
656 direction: self.direction,
657 watermark_type: self.watermark_type,
658 }
659 }
660}
661
662impl TableWatermarks {
663 pub fn estimated_encode_len(&self) -> usize {
664 self.watermarks.len() * size_of::<HummockEpoch>()
665 + self
666 .watermarks
667 .iter()
668 .map(|(_, watermarks)| {
669 watermarks
670 .iter()
671 .map(|watermark| watermark.estimated_size())
672 .sum::<usize>()
673 })
674 .sum::<usize>()
675 + size_of::<bool>() }
677
678 pub fn to_protobuf(&self) -> PbTableWatermarks {
679 self.into()
680 }
681}
682
683impl From<&PbTableWatermarks> for TableWatermarks {
684 fn from(pb: &PbTableWatermarks) -> Self {
685 Self {
686 watermarks: pb
687 .epoch_watermarks
688 .iter()
689 .map(|epoch_watermark| {
690 let epoch = epoch_watermark.epoch;
691 let watermarks = epoch_watermark
692 .watermarks
693 .iter()
694 .map(VnodeWatermark::from)
695 .collect();
696 (epoch, watermarks)
697 })
698 .collect(),
699 direction: if pb.is_ascending {
700 WatermarkDirection::Ascending
701 } else {
702 WatermarkDirection::Descending
703 },
704 watermark_type: if pb.is_non_pk_prefix {
705 WatermarkSerdeType::NonPkPrefix
706 } else {
707 WatermarkSerdeType::PkPrefix
708 },
709 }
710 }
711}
712
713impl From<&TableWatermarks> for PbTableWatermarks {
714 fn from(table_watermarks: &TableWatermarks) -> Self {
715 Self {
716 epoch_watermarks: table_watermarks
717 .watermarks
718 .iter()
719 .map(|(epoch, watermarks)| PbEpochNewWatermarks {
720 watermarks: watermarks.iter().map(|wm| wm.into()).collect(),
721 epoch: *epoch,
722 })
723 .collect(),
724 is_ascending: match table_watermarks.direction {
725 WatermarkDirection::Ascending => true,
726 WatermarkDirection::Descending => false,
727 },
728 is_non_pk_prefix: match table_watermarks.watermark_type {
729 WatermarkSerdeType::NonPkPrefix => true,
730 WatermarkSerdeType::PkPrefix => false,
731 },
732 }
733 }
734}
735
736impl From<PbTableWatermarks> for TableWatermarks {
737 fn from(pb: PbTableWatermarks) -> Self {
738 Self {
739 watermarks: pb
740 .epoch_watermarks
741 .into_iter()
742 .map(|epoch_watermark| {
743 let epoch = epoch_watermark.epoch;
744 let watermarks = epoch_watermark
745 .watermarks
746 .into_iter()
747 .map(VnodeWatermark::from)
748 .collect();
749 (epoch, watermarks)
750 })
751 .collect(),
752 direction: if pb.is_ascending {
753 WatermarkDirection::Ascending
754 } else {
755 WatermarkDirection::Descending
756 },
757 watermark_type: if pb.is_non_pk_prefix {
758 WatermarkSerdeType::NonPkPrefix
759 } else {
760 WatermarkSerdeType::PkPrefix
761 },
762 }
763 }
764}
765
766impl From<TableWatermarks> for PbTableWatermarks {
767 fn from(table_watermarks: TableWatermarks) -> Self {
768 Self {
769 epoch_watermarks: table_watermarks
770 .watermarks
771 .into_iter()
772 .map(|(epoch, watermarks)| PbEpochNewWatermarks {
773 watermarks: watermarks.iter().map(PbVnodeWatermark::from).collect(),
774 epoch,
775 })
776 .collect(),
777 is_ascending: match table_watermarks.direction {
778 WatermarkDirection::Ascending => true,
779 WatermarkDirection::Descending => false,
780 },
781 is_non_pk_prefix: match table_watermarks.watermark_type {
782 WatermarkSerdeType::NonPkPrefix => true,
783 WatermarkSerdeType::PkPrefix => false,
784 },
785 }
786 }
787}
788
789#[derive(Debug, Clone, Copy, PartialEq, Eq)]
790pub enum WatermarkSerdeType {
791 PkPrefix,
792 NonPkPrefix,
793}
794
795#[cfg(test)]
796mod tests {
797 use std::collections::Bound::Included;
798 use std::collections::{Bound, HashMap};
799 use std::ops::Bound::Excluded;
800 use std::sync::Arc;
801 use std::vec;
802
803 use bytes::Bytes;
804 use itertools::Itertools;
805 use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
806 use risingwave_common::catalog::TableId;
807 use risingwave_common::hash::VirtualNode;
808 use risingwave_common::util::epoch::{EpochExt, test_epoch};
809 use risingwave_pb::hummock::{PbHummockVersion, StateTableInfo};
810
811 use crate::compaction_group::StaticCompactionGroupId;
812 use crate::key::{TableKeyRange, is_empty_key_range, prefixed_range_with_vnode};
813 use crate::table_watermark::{
814 TableWatermarks, TableWatermarksIndex, VnodeWatermark, WatermarkDirection,
815 WatermarkSerdeType, merge_multiple_new_table_watermarks,
816 };
817 use crate::version::HummockVersion;
818
819 fn build_bitmap(vnodes: impl IntoIterator<Item = usize>) -> Arc<Bitmap> {
820 let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
821 for vnode in vnodes {
822 builder.set(vnode, true);
823 }
824 Arc::new(builder.finish())
825 }
826
827 #[test]
828 fn test_apply_new_table_watermark() {
829 let epoch1 = test_epoch(1);
830 let direction = WatermarkDirection::Ascending;
831 let watermark1 = Bytes::from("watermark1");
832 let watermark2 = Bytes::from("watermark2");
833 let watermark3 = Bytes::from("watermark3");
834 let watermark4 = Bytes::from("watermark4");
835 let watermark_type = WatermarkSerdeType::PkPrefix;
836 let mut table_watermarks = TableWatermarks::single_epoch(
837 epoch1,
838 vec![VnodeWatermark::new(
839 build_bitmap(vec![0, 1, 2]),
840 watermark1.clone(),
841 )],
842 direction,
843 watermark_type,
844 );
845 let epoch2 = epoch1.next_epoch();
846 table_watermarks.add_new_epoch_watermarks(
847 epoch2,
848 vec![VnodeWatermark::new(
849 build_bitmap(vec![0, 1, 2, 3]),
850 watermark2.clone(),
851 )]
852 .into(),
853 direction,
854 watermark_type,
855 );
856
857 let mut table_watermark_checkpoint = table_watermarks.clone();
858
859 let epoch3 = epoch2.next_epoch();
860 let mut second_table_watermark = TableWatermarks::single_epoch(
861 epoch3,
862 vec![VnodeWatermark::new(
863 build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
864 watermark3.clone(),
865 )],
866 direction,
867 watermark_type,
868 );
869 table_watermarks.add_new_epoch_watermarks(
870 epoch3,
871 vec![VnodeWatermark::new(
872 build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
873 watermark3.clone(),
874 )]
875 .into(),
876 direction,
877 watermark_type,
878 );
879 let epoch4 = epoch3.next_epoch();
880 let epoch5 = epoch4.next_epoch();
881 table_watermarks.add_new_epoch_watermarks(
882 epoch5,
883 vec![VnodeWatermark::new(
884 build_bitmap(vec![0, 3, 4]),
885 watermark4.clone(),
886 )]
887 .into(),
888 direction,
889 watermark_type,
890 );
891 second_table_watermark.add_new_epoch_watermarks(
892 epoch5,
893 vec![VnodeWatermark::new(
894 build_bitmap(vec![0, 3, 4]),
895 watermark4.clone(),
896 )]
897 .into(),
898 direction,
899 watermark_type,
900 );
901
902 table_watermark_checkpoint.apply_new_table_watermarks(&second_table_watermark);
903 assert_eq!(table_watermarks, table_watermark_checkpoint);
904 }
905
906 #[test]
907 fn test_clear_stale_epoch_watmermark() {
908 let epoch1 = test_epoch(1);
909 let direction = WatermarkDirection::Ascending;
910 let watermark1 = Bytes::from("watermark1");
911 let watermark2 = Bytes::from("watermark2");
912 let watermark3 = Bytes::from("watermark3");
913 let watermark4 = Bytes::from("watermark4");
914 let watermark_type = WatermarkSerdeType::PkPrefix;
915 let mut table_watermarks = TableWatermarks::single_epoch(
916 epoch1,
917 vec![VnodeWatermark::new(
918 build_bitmap(vec![0, 1, 2]),
919 watermark1.clone(),
920 )],
921 direction,
922 watermark_type,
923 );
924 let epoch2 = epoch1.next_epoch();
925 table_watermarks.add_new_epoch_watermarks(
926 epoch2,
927 vec![VnodeWatermark::new(
928 build_bitmap(vec![0, 1, 2, 3]),
929 watermark2.clone(),
930 )]
931 .into(),
932 direction,
933 watermark_type,
934 );
935 let epoch3 = epoch2.next_epoch();
936 table_watermarks.add_new_epoch_watermarks(
937 epoch3,
938 vec![VnodeWatermark::new(
939 build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
940 watermark3.clone(),
941 )]
942 .into(),
943 direction,
944 watermark_type,
945 );
946 let epoch4 = epoch3.next_epoch();
947 let epoch5 = epoch4.next_epoch();
948 table_watermarks.add_new_epoch_watermarks(
949 epoch5,
950 vec![VnodeWatermark::new(
951 build_bitmap(vec![0, 3, 4]),
952 watermark4.clone(),
953 )]
954 .into(),
955 direction,
956 watermark_type,
957 );
958
959 let mut table_watermarks_checkpoint = table_watermarks.clone();
960 table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch1);
961 assert_eq!(table_watermarks_checkpoint, table_watermarks);
962
963 table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch2);
964 assert_eq!(
965 table_watermarks_checkpoint,
966 TableWatermarks {
967 watermarks: vec![
968 (
969 epoch2,
970 vec![VnodeWatermark::new(
971 build_bitmap(vec![0, 1, 2, 3]),
972 watermark2.clone(),
973 )]
974 .into()
975 ),
976 (
977 epoch3,
978 vec![VnodeWatermark::new(
979 build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
980 watermark3.clone(),
981 )]
982 .into()
983 ),
984 (
985 epoch5,
986 vec![VnodeWatermark::new(
987 build_bitmap(vec![0, 3, 4]),
988 watermark4.clone(),
989 )]
990 .into()
991 )
992 ],
993 direction,
994 watermark_type,
995 }
996 );
997
998 table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch3);
999 assert_eq!(
1000 table_watermarks_checkpoint,
1001 TableWatermarks {
1002 watermarks: vec![
1003 (
1004 epoch3,
1005 vec![VnodeWatermark::new(
1006 build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
1007 watermark3.clone(),
1008 )]
1009 .into()
1010 ),
1011 (
1012 epoch5,
1013 vec![VnodeWatermark::new(
1014 build_bitmap(vec![0, 3, 4]),
1015 watermark4.clone(),
1016 )]
1017 .into()
1018 )
1019 ],
1020 direction,
1021 watermark_type,
1022 }
1023 );
1024
1025 table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch4);
1026 assert_eq!(
1027 table_watermarks_checkpoint,
1028 TableWatermarks {
1029 watermarks: vec![
1030 (
1031 epoch4,
1032 vec![VnodeWatermark::new(
1033 build_bitmap((1..3).chain(5..VirtualNode::COUNT_FOR_TEST)),
1034 watermark3.clone()
1035 )]
1036 .into()
1037 ),
1038 (
1039 epoch5,
1040 vec![VnodeWatermark::new(
1041 build_bitmap(vec![0, 3, 4]),
1042 watermark4.clone(),
1043 )]
1044 .into()
1045 )
1046 ],
1047 direction,
1048 watermark_type,
1049 }
1050 );
1051
1052 table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch5);
1053 assert_eq!(
1054 table_watermarks_checkpoint,
1055 TableWatermarks {
1056 watermarks: vec![(
1057 epoch5,
1058 vec![
1059 VnodeWatermark::new(build_bitmap(vec![0, 3, 4]), watermark4.clone()),
1060 VnodeWatermark::new(
1061 build_bitmap((1..3).chain(5..VirtualNode::COUNT_FOR_TEST)),
1062 watermark3.clone()
1063 )
1064 ]
1065 .into()
1066 )],
1067 direction,
1068 watermark_type,
1069 }
1070 );
1071 }
1072
1073 #[test]
1074 fn test_merge_multiple_new_table_watermarks() {
1075 fn epoch_new_watermark(epoch: u64, bitmaps: Vec<&Bitmap>) -> (u64, Arc<[VnodeWatermark]>) {
1076 (
1077 epoch,
1078 bitmaps
1079 .into_iter()
1080 .map(|bitmap| VnodeWatermark {
1081 watermark: Bytes::from(vec![1, 2, epoch as _]),
1082 vnode_bitmap: Arc::new(bitmap.clone()),
1083 })
1084 .collect_vec()
1085 .into(),
1086 )
1087 }
1088 fn build_table_watermark(
1089 vnodes: impl IntoIterator<Item = usize>,
1090 epochs: impl IntoIterator<Item = u64>,
1091 ) -> TableWatermarks {
1092 let bitmap = build_bitmap(vnodes);
1093 TableWatermarks {
1094 watermarks: epochs
1095 .into_iter()
1096 .map(|epoch: u64| epoch_new_watermark(epoch, vec![&bitmap]))
1097 .collect(),
1098 direction: WatermarkDirection::Ascending,
1099 watermark_type: WatermarkSerdeType::PkPrefix,
1100 }
1101 }
1102 let table1_watermark1 = build_table_watermark(0..3, vec![1, 2, 4]);
1103 let table1_watermark2 = build_table_watermark(4..6, vec![1, 2, 5]);
1104 let table2_watermark = build_table_watermark(0..4, 1..3);
1105 let table3_watermark = build_table_watermark(0..4, 3..5);
1106 let mut first = HashMap::new();
1107 first.insert(TableId::new(1), table1_watermark1);
1108 first.insert(TableId::new(2), table2_watermark.clone());
1109 let mut second = HashMap::new();
1110 second.insert(TableId::new(1), table1_watermark2);
1111 second.insert(TableId::new(3), table3_watermark.clone());
1112 let result = merge_multiple_new_table_watermarks(vec![first, second]);
1113 let mut expected = HashMap::new();
1114 expected.insert(
1115 TableId::new(1),
1116 TableWatermarks {
1117 watermarks: vec![
1118 epoch_new_watermark(1, vec![&build_bitmap(0..3), &build_bitmap(4..6)]),
1119 epoch_new_watermark(2, vec![&build_bitmap(0..3), &build_bitmap(4..6)]),
1120 epoch_new_watermark(4, vec![&build_bitmap(0..3)]),
1121 epoch_new_watermark(5, vec![&build_bitmap(4..6)]),
1122 ],
1123 direction: WatermarkDirection::Ascending,
1124 watermark_type: WatermarkSerdeType::PkPrefix,
1125 },
1126 );
1127 expected.insert(TableId::new(2), table2_watermark);
1128 expected.insert(TableId::new(3), table3_watermark);
1129 assert_eq!(result, expected);
1130 }
1131
1132 const COMMITTED_EPOCH: u64 = test_epoch(1);
1133 const EPOCH1: u64 = test_epoch(2);
1134 const EPOCH2: u64 = test_epoch(3);
1135 const TEST_SINGLE_VNODE: VirtualNode = VirtualNode::from_index(1);
1136
1137 fn build_watermark_range(
1138 direction: WatermarkDirection,
1139 (low, high): (Bound<Bytes>, Bound<Bytes>),
1140 ) -> TableKeyRange {
1141 let range = match direction {
1142 WatermarkDirection::Ascending => (low, high),
1143 WatermarkDirection::Descending => (high, low),
1144 };
1145 prefixed_range_with_vnode(range, TEST_SINGLE_VNODE)
1146 }
1147
1148 fn build_and_test_watermark_index(
1152 direction: WatermarkDirection,
1153 watermark1: Bytes,
1154 watermark2: Bytes,
1155 watermark3: Bytes,
1156 ) -> TableWatermarksIndex {
1157 let mut index = TableWatermarksIndex::new(
1158 direction,
1159 EPOCH1,
1160 vec![VnodeWatermark::new(build_bitmap(0..4), watermark1.clone())],
1161 Some(COMMITTED_EPOCH),
1162 );
1163 index.add_epoch_watermark(
1164 EPOCH2,
1165 vec![VnodeWatermark::new(build_bitmap(1..5), watermark2.clone())].into(),
1166 direction,
1167 );
1168
1169 assert_eq!(
1170 index.read_watermark(VirtualNode::from_index(0), EPOCH1),
1171 Some(watermark1.clone())
1172 );
1173 assert_eq!(
1174 index.read_watermark(VirtualNode::from_index(1), EPOCH1),
1175 Some(watermark1.clone())
1176 );
1177 assert_eq!(
1178 index.read_watermark(VirtualNode::from_index(4), EPOCH1),
1179 None
1180 );
1181 assert_eq!(
1182 index.read_watermark(VirtualNode::from_index(0), EPOCH2),
1183 Some(watermark1.clone())
1184 );
1185 assert_eq!(
1186 index.read_watermark(VirtualNode::from_index(1), EPOCH2),
1187 Some(watermark2.clone())
1188 );
1189 assert_eq!(
1190 index.read_watermark(VirtualNode::from_index(4), EPOCH2),
1191 Some(watermark2.clone())
1192 );
1193 assert_eq!(
1194 index.latest_watermark(VirtualNode::from_index(0)),
1195 Some(watermark1.clone())
1196 );
1197 assert_eq!(
1198 index.latest_watermark(VirtualNode::from_index(1)),
1199 Some(watermark2.clone())
1200 );
1201 assert_eq!(
1202 index.latest_watermark(VirtualNode::from_index(4)),
1203 Some(watermark2.clone())
1204 );
1205
1206 let check_watermark_range =
1208 |query_range: (Bound<Bytes>, Bound<Bytes>),
1209 output_range: Option<(Bound<Bytes>, Bound<Bytes>)>| {
1210 let mut range = build_watermark_range(direction, query_range);
1211 index.rewrite_range_with_table_watermark(EPOCH2, &mut range);
1212 if let Some(output_range) = output_range {
1213 assert_eq!(range, build_watermark_range(direction, output_range));
1214 } else {
1215 assert!(is_empty_key_range(&range));
1216 }
1217 };
1218
1219 check_watermark_range(
1221 (Included(watermark1.clone()), Excluded(watermark3.clone())),
1222 Some((Included(watermark2.clone()), Excluded(watermark3.clone()))),
1223 );
1224
1225 check_watermark_range(
1227 (Included(watermark2.clone()), Excluded(watermark3.clone())),
1228 Some((Included(watermark2.clone()), Excluded(watermark3.clone()))),
1229 );
1230 check_watermark_range(
1231 (Excluded(watermark2.clone()), Excluded(watermark3.clone())),
1232 Some((Excluded(watermark2.clone()), Excluded(watermark3.clone()))),
1233 );
1234
1235 check_watermark_range(
1237 (Excluded(watermark1.clone()), Excluded(watermark2.clone())),
1238 None,
1239 );
1240 check_watermark_range(
1241 (Excluded(watermark1.clone()), Included(watermark2.clone())),
1242 Some((Included(watermark2.clone()), Included(watermark2.clone()))),
1243 );
1244
1245 index
1246 }
1247
1248 #[test]
1249 fn test_watermark_index_ascending() {
1250 let watermark1 = Bytes::from_static(b"watermark1");
1251 let watermark2 = Bytes::from_static(b"watermark2");
1252 let watermark3 = Bytes::from_static(b"watermark3");
1253 build_and_test_watermark_index(
1254 WatermarkDirection::Ascending,
1255 watermark1.clone(),
1256 watermark2.clone(),
1257 watermark3.clone(),
1258 );
1259 }
1260
1261 #[test]
1262 fn test_watermark_index_descending() {
1263 let watermark1 = Bytes::from_static(b"watermark254");
1264 let watermark2 = Bytes::from_static(b"watermark253");
1265 let watermark3 = Bytes::from_static(b"watermark252");
1266 build_and_test_watermark_index(
1267 WatermarkDirection::Descending,
1268 watermark1.clone(),
1269 watermark2.clone(),
1270 watermark3.clone(),
1271 );
1272 }
1273
1274 #[test]
1275 fn test_apply_committed_index() {
1276 let watermark1 = Bytes::from_static(b"watermark1");
1277 let watermark2 = Bytes::from_static(b"watermark2");
1278 let watermark3 = Bytes::from_static(b"watermark3");
1279 let mut index = build_and_test_watermark_index(
1280 WatermarkDirection::Ascending,
1281 watermark1.clone(),
1282 watermark2.clone(),
1283 watermark3.clone(),
1284 );
1285
1286 let test_table_id = TableId::from(233);
1287
1288 let mut version = HummockVersion::from_rpc_protobuf(&PbHummockVersion {
1289 state_table_info: HashMap::from_iter([(
1290 test_table_id.table_id,
1291 StateTableInfo {
1292 committed_epoch: EPOCH1,
1293 compaction_group_id: StaticCompactionGroupId::StateDefault as _,
1294 },
1295 )]),
1296 ..Default::default()
1297 });
1298 version.table_watermarks.insert(
1299 test_table_id,
1300 TableWatermarks {
1301 watermarks: vec![(
1302 EPOCH1,
1303 vec![VnodeWatermark {
1304 watermark: watermark1.clone(),
1305 vnode_bitmap: build_bitmap(0..VirtualNode::COUNT_FOR_TEST),
1306 }]
1307 .into(),
1308 )],
1309 direction: WatermarkDirection::Ascending,
1310 watermark_type: WatermarkSerdeType::PkPrefix,
1311 }
1312 .into(),
1313 );
1314 index.apply_committed_watermarks(
1315 version
1316 .table_watermarks
1317 .get(&test_table_id)
1318 .unwrap()
1319 .clone(),
1320 EPOCH1,
1321 );
1322 assert_eq!(EPOCH1, index.committed_epoch.unwrap());
1323 assert_eq!(EPOCH2, index.latest_epoch);
1324 for vnode in 0..VirtualNode::COUNT_FOR_TEST {
1325 let vnode = VirtualNode::from_index(vnode);
1326 if (1..5).contains(&vnode.to_index()) {
1327 assert_eq!(watermark1, index.read_watermark(vnode, EPOCH1).unwrap());
1328 assert_eq!(watermark2, index.read_watermark(vnode, EPOCH2).unwrap());
1329 } else {
1330 assert_eq!(watermark1, index.read_watermark(vnode, EPOCH1).unwrap());
1331 }
1332 }
1333 }
1334
1335 #[test]
1336 fn test_filter_regress_watermark() {
1337 let watermark1 = Bytes::from_static(b"watermark1");
1338 let watermark2 = Bytes::from_static(b"watermark2");
1339 let watermark3 = Bytes::from_static(b"watermark3");
1340 let index = build_and_test_watermark_index(
1341 WatermarkDirection::Ascending,
1342 watermark1.clone(),
1343 watermark2.clone(),
1344 watermark3.clone(),
1345 );
1346
1347 let mut new_watermarks = vec![
1348 VnodeWatermark {
1350 vnode_bitmap: build_bitmap(0..2),
1351 watermark: watermark1.clone(),
1352 },
1353 VnodeWatermark {
1355 vnode_bitmap: build_bitmap(2..4),
1356 watermark: watermark3.clone(),
1357 },
1358 VnodeWatermark {
1360 vnode_bitmap: build_bitmap(4..5),
1361 watermark: watermark1.clone(),
1362 },
1363 VnodeWatermark {
1365 vnode_bitmap: build_bitmap(5..6),
1366 watermark: watermark3.clone(),
1367 },
1368 ];
1369
1370 index.filter_regress_watermarks(&mut new_watermarks);
1371
1372 assert_eq!(
1373 new_watermarks,
1374 vec![
1375 VnodeWatermark {
1376 vnode_bitmap: build_bitmap(0..1),
1377 watermark: watermark1,
1378 },
1379 VnodeWatermark {
1380 vnode_bitmap: build_bitmap(2..4),
1381 watermark: watermark3.clone(),
1382 },
1383 VnodeWatermark {
1384 vnode_bitmap: build_bitmap(5..6),
1385 watermark: watermark3,
1386 },
1387 ]
1388 );
1389 }
1390}