risingwave_stream/executor/watermark/
mod.rs1use std::cmp::Reverse;
16use std::collections::{BTreeMap, BinaryHeap, HashSet, VecDeque};
17use std::hash::Hash;
18
19use super::Watermark;
20
21const MAX_STAGED_WATERMARKS_PER_INPUT: usize = 1024;
22
23#[derive(Default, Debug)]
24pub(super) struct StagedWatermarks {
25 in_heap: bool,
26 staged: VecDeque<Watermark>,
27}
28
29impl StagedWatermarks {
30 fn push_with_cap(&mut self, watermark: Watermark) {
31 if self.staged.len() >= MAX_STAGED_WATERMARKS_PER_INPUT {
32 let _ = self.staged.pop_front();
33 }
34 self.staged.push_back(watermark);
35 }
36}
37
38pub(super) struct BufferedWatermarks<Id> {
39 pub first_buffered_watermarks: BinaryHeap<Reverse<(Watermark, Id)>>,
42 pub other_buffered_watermarks: BTreeMap<Id, StagedWatermarks>,
45}
46
47impl<Id: Ord + Hash + std::fmt::Debug> BufferedWatermarks<Id> {
48 pub fn with_ids(buffer_ids: impl IntoIterator<Item = Id>) -> Self {
49 let other_buffered_watermarks: BTreeMap<_, _> = buffer_ids
50 .into_iter()
51 .map(|id| (id, Default::default()))
52 .collect();
53 let first_buffered_watermarks = BinaryHeap::with_capacity(other_buffered_watermarks.len());
54
55 BufferedWatermarks {
56 first_buffered_watermarks,
57 other_buffered_watermarks,
58 }
59 }
60
61 pub fn add_buffers(&mut self, buffer_ids: impl IntoIterator<Item = Id>) {
62 buffer_ids.into_iter().for_each(|id| {
63 self.other_buffered_watermarks
64 .try_insert(id, Default::default())
65 .unwrap();
66 });
67 }
68
69 pub fn clear(&mut self) {
70 self.first_buffered_watermarks.clear();
71 self.other_buffered_watermarks
72 .values_mut()
73 .for_each(|staged_watermarks| {
74 std::mem::take(staged_watermarks);
75 });
76 }
77
78 pub fn handle_watermark(&mut self, buffer_id: Id, watermark: Watermark) -> Option<Watermark> {
81 let staged = self.other_buffered_watermarks.get_mut(&buffer_id).unwrap();
83
84 if staged.in_heap {
85 staged.push_with_cap(watermark);
86 None
87 } else {
88 staged.in_heap = true;
89 self.first_buffered_watermarks
90 .push(Reverse((watermark, buffer_id)));
91 self.check_watermark_heap()
92 }
93 }
94
95 pub fn check_watermark_heap(&mut self) -> Option<Watermark> {
97 let len = self.other_buffered_watermarks.len();
98 let mut watermark_to_emit = None;
99 while !self.first_buffered_watermarks.is_empty()
100 && (self.first_buffered_watermarks.len() == len
101 || watermark_to_emit.as_ref().is_some_and(|watermark| {
102 watermark == &self.first_buffered_watermarks.peek().unwrap().0.0
103 }))
104 {
105 let Reverse((watermark, id)) = self.first_buffered_watermarks.pop().unwrap();
106 watermark_to_emit = Some(watermark);
107 let staged = self.other_buffered_watermarks.get_mut(&id).unwrap();
108 if let Some(first) = staged.staged.pop_front() {
109 self.first_buffered_watermarks.push(Reverse((first, id)));
110 } else {
111 staged.in_heap = false;
112 }
113 }
114 watermark_to_emit
115 }
116
117 pub fn remove_buffer(&mut self, buffer_ids_to_remove: HashSet<Id>) -> Option<Watermark> {
119 self.first_buffered_watermarks
120 .retain(|Reverse((_, id))| !buffer_ids_to_remove.contains(id));
121 self.other_buffered_watermarks
122 .retain(|id, _| !buffer_ids_to_remove.contains(id));
123 self.check_watermark_heap()
126 }
127}
128
129#[cfg(test)]
130mod tests {
131 use risingwave_common::types::{DataType, ScalarImpl};
132
133 use super::{BufferedWatermarks, MAX_STAGED_WATERMARKS_PER_INPUT, Watermark};
134
135 fn wm(v: i64) -> Watermark {
136 Watermark::new(0, DataType::Int64, ScalarImpl::Int64(v))
137 }
138
139 #[test]
140 fn test_staged_watermarks_capped() {
141 let mut stalled_case = BufferedWatermarks::with_ids([1_u8, 2_u8]);
144 assert!(stalled_case.handle_watermark(1, wm(1)).is_none());
145
146 for i in 2..=(MAX_STAGED_WATERMARKS_PER_INPUT as i64 + 5) {
147 assert!(stalled_case.handle_watermark(1, wm(i)).is_none());
148 }
149
150 let staged = &stalled_case
151 .other_buffered_watermarks
152 .get(&1)
153 .unwrap()
154 .staged;
155 assert_eq!(staged.len(), MAX_STAGED_WATERMARKS_PER_INPUT);
156 assert_eq!(staged.front().unwrap().val, ScalarImpl::Int64(6));
157 assert_eq!(
158 staged.back().unwrap().val,
159 ScalarImpl::Int64(MAX_STAGED_WATERMARKS_PER_INPUT as i64 + 5)
160 );
161
162 for i in 1..=(MAX_STAGED_WATERMARKS_PER_INPUT as i64 + 5) {
165 let emitted = stalled_case
166 .handle_watermark(2, wm(i))
167 .expect("watermark should be emitted when stalled upstream becomes active again");
168 assert_eq!(emitted.val, ScalarImpl::Int64(i));
169 }
170 let staged_1 = &stalled_case
171 .other_buffered_watermarks
172 .get(&1)
173 .unwrap()
174 .staged;
175 let staged_2 = &stalled_case
176 .other_buffered_watermarks
177 .get(&2)
178 .unwrap()
179 .staged;
180 assert!(staged_1.is_empty());
181 assert!(staged_2.is_empty());
182 assert!(stalled_case.first_buffered_watermarks.is_empty());
183 }
184}