risingwave_stream/executor/watermark/
mod.rs1use std::cmp::Reverse;
16use std::collections::{BTreeMap, BinaryHeap, HashSet, VecDeque};
17use std::hash::Hash;
18
19use super::Watermark;
20
21#[derive(Default, Debug)]
22pub(super) struct StagedWatermarks {
23 in_heap: bool,
24 staged: VecDeque<Watermark>,
25}
26
27pub(super) struct BufferedWatermarks<Id> {
28 pub first_buffered_watermarks: BinaryHeap<Reverse<(Watermark, Id)>>,
31 pub other_buffered_watermarks: BTreeMap<Id, StagedWatermarks>,
34}
35
36impl<Id: Ord + Hash + std::fmt::Debug> BufferedWatermarks<Id> {
37 pub fn with_ids(buffer_ids: impl IntoIterator<Item = Id>) -> Self {
38 let other_buffered_watermarks: BTreeMap<_, _> = buffer_ids
39 .into_iter()
40 .map(|id| (id, Default::default()))
41 .collect();
42 let first_buffered_watermarks = BinaryHeap::with_capacity(other_buffered_watermarks.len());
43
44 BufferedWatermarks {
45 first_buffered_watermarks,
46 other_buffered_watermarks,
47 }
48 }
49
50 pub fn add_buffers(&mut self, buffer_ids: impl IntoIterator<Item = Id>) {
51 buffer_ids.into_iter().for_each(|id| {
52 self.other_buffered_watermarks
53 .try_insert(id, Default::default())
54 .unwrap();
55 });
56 }
57
58 pub fn clear(&mut self) {
59 self.first_buffered_watermarks.clear();
60 self.other_buffered_watermarks
61 .values_mut()
62 .for_each(|staged_watermarks| {
63 std::mem::take(staged_watermarks);
64 });
65 }
66
67 pub fn handle_watermark(&mut self, buffer_id: Id, watermark: Watermark) -> Option<Watermark> {
70 let staged = self.other_buffered_watermarks.get_mut(&buffer_id).unwrap();
72
73 if staged.in_heap {
74 staged.staged.push_back(watermark);
75 None
76 } else {
77 staged.in_heap = true;
78 self.first_buffered_watermarks
79 .push(Reverse((watermark, buffer_id)));
80 self.check_watermark_heap()
81 }
82 }
83
84 pub fn check_watermark_heap(&mut self) -> Option<Watermark> {
86 let len = self.other_buffered_watermarks.len();
87 let mut watermark_to_emit = None;
88 while !self.first_buffered_watermarks.is_empty()
89 && (self.first_buffered_watermarks.len() == len
90 || watermark_to_emit.as_ref().is_some_and(|watermark| {
91 watermark == &self.first_buffered_watermarks.peek().unwrap().0.0
92 }))
93 {
94 let Reverse((watermark, id)) = self.first_buffered_watermarks.pop().unwrap();
95 watermark_to_emit = Some(watermark);
96 let staged = self.other_buffered_watermarks.get_mut(&id).unwrap();
97 if let Some(first) = staged.staged.pop_front() {
98 self.first_buffered_watermarks.push(Reverse((first, id)));
99 } else {
100 staged.in_heap = false;
101 }
102 }
103 watermark_to_emit
104 }
105
106 pub fn remove_buffer(&mut self, buffer_ids_to_remove: HashSet<Id>) -> Option<Watermark> {
108 self.first_buffered_watermarks
109 .retain(|Reverse((_, id))| !buffer_ids_to_remove.contains(id));
110 self.other_buffered_watermarks
111 .retain(|id, _| !buffer_ids_to_remove.contains(id));
112 self.check_watermark_heap()
115 }
116}