risingwave_stream/common/
change_buffer.rs1use std::sync::LazyLock;
16
17use indexmap::IndexMap;
18use indexmap::map::Entry;
19use risingwave_common::array::stream_record::Record;
20use risingwave_common::array::{Op, StreamChunk, StreamChunkBuilder};
21use risingwave_common::log::LogSuppresser;
22use risingwave_common::row::Row;
23use risingwave_common::types::DataType;
24
25use crate::consistency::consistency_panic;
26
27#[derive(Clone, Copy, Debug, PartialEq, Eq)]
29pub enum InconsistencyBehavior {
30 Panic,
31 Warn,
32 Tolerate,
33}
34
35impl InconsistencyBehavior {
36 #[track_caller]
38 pub fn report(self, msg: &str) {
39 match self {
40 InconsistencyBehavior::Panic => consistency_panic!("{}", msg),
41 InconsistencyBehavior::Warn => {
42 static LOG_SUPPERSSER: LazyLock<LogSuppresser> =
43 LazyLock::new(LogSuppresser::default);
44
45 if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
46 tracing::warn!(suppressed_count, "{}", msg);
47 }
48 }
49 InconsistencyBehavior::Tolerate => {}
50 }
51 }
52}
53
54mod private {
55 pub trait Key: Eq + std::hash::Hash {}
56 impl<K> Key for K where K: Eq + std::hash::Hash {}
57
58 pub trait Row: Eq + Default {}
59 impl<R> Row for R where R: Eq + Default {}
60}
61
62#[derive(Debug)]
64pub struct ChangeBuffer<K, R> {
65 buffer: IndexMap<K, Record<R>>,
67 ib: InconsistencyBehavior,
68}
69
70impl<K, R> ChangeBuffer<K, R>
71where
72 K: private::Key,
73 R: private::Row,
74{
75 pub fn insert(&mut self, key: K, new_row: R) {
77 let entry = self.buffer.entry(key);
78 match entry {
79 Entry::Vacant(e) => {
80 e.insert(Record::Insert { new_row });
81 }
82 Entry::Occupied(mut e) => match e.get_mut() {
83 Record::Delete { old_row } => {
84 let old_row = std::mem::take(old_row);
85 e.insert(Record::Update { old_row, new_row });
86 }
87 Record::Insert { new_row: dst } => {
88 self.ib.report("inconsistent changes: double-inserting");
89 *dst = new_row;
90 }
91 Record::Update { new_row: dst, .. } => {
92 self.ib.report("inconsistent changes: double-inserting");
93 *dst = new_row;
94 }
95 },
96 }
97 }
98
99 pub fn delete(&mut self, key: K, old_row: R) {
101 let entry = self.buffer.entry(key);
102 match entry {
103 Entry::Vacant(e) => {
104 e.insert(Record::Delete { old_row });
105 }
106 Entry::Occupied(mut e) => match e.get_mut() {
107 Record::Insert { .. } => {
108 e.shift_remove();
111 }
112 Record::Update { old_row, .. } => {
113 let old_row = std::mem::take(old_row);
114 e.insert(Record::Delete { old_row });
115 }
116 Record::Delete { old_row: dst } => {
117 self.ib.report("inconsistent changes: double-deleting");
118 *dst = old_row;
119 }
120 },
121 }
122 }
123
124 pub fn update(&mut self, key: K, old_row: R, new_row: R) {
126 let entry = self.buffer.entry(key);
127 match entry {
128 Entry::Vacant(e) => {
129 e.insert(Record::Update { old_row, new_row });
130 }
131 Entry::Occupied(mut e) => match e.get_mut() {
132 Record::Insert { .. } => {
133 e.insert(Record::Insert { new_row });
134 }
135 Record::Update { new_row: dst, .. } => {
136 *dst = new_row;
137 }
138 Record::Delete { .. } => {
139 self.ib.report("inconsistent changes: update after delete");
140 e.insert(Record::Update { old_row, new_row });
141 }
142 },
143 }
144 }
145
146 pub fn apply_record(&mut self, record: Record<R>, key_fn: impl Fn(&R) -> K) {
151 match record {
152 Record::Insert { new_row } => self.insert(key_fn(&new_row), new_row),
153 Record::Delete { old_row } => self.delete(key_fn(&old_row), old_row),
154 Record::Update { old_row, new_row } => {
155 let old_key = key_fn(&old_row);
156 let new_key = key_fn(&new_row);
157
158 if old_key != new_key {
160 self.ib
161 .report("inconsistent changes: mismatched key in update");
162 self.delete(old_key, old_row);
163 self.insert(new_key, new_row);
164 } else {
165 self.update(old_key, old_row, new_row);
166 }
167 }
168 }
169 }
170
171 pub fn apply_op_row(&mut self, op: Op, key: K, row: R) {
173 match op {
174 Op::Insert | Op::UpdateInsert => self.insert(key, row),
175 Op::Delete | Op::UpdateDelete => self.delete(key, row),
176 }
177 }
178
179 pub fn into_records(self) -> impl Iterator<Item = Record<R>> {
183 self.buffer.into_values().filter(|record| match record {
184 Record::Insert { .. } => true,
185 Record::Delete { .. } => true,
186 Record::Update { old_row, new_row } => old_row != new_row,
187 })
188 }
189}
190
191impl<K, R> Default for ChangeBuffer<K, R> {
192 fn default() -> Self {
193 Self::new()
194 }
195}
196
197impl<K, R> ChangeBuffer<K, R> {
198 pub fn new() -> Self {
200 Self::with_capacity(0)
201 }
202
203 pub fn with_capacity(capacity: usize) -> Self {
205 Self {
206 buffer: IndexMap::with_capacity(capacity),
207 ib: InconsistencyBehavior::Panic,
208 }
209 }
210
211 pub fn with_inconsistency_behavior(mut self, ib: InconsistencyBehavior) -> Self {
213 self.ib = ib;
214 self
215 }
216
217 pub fn len(&self) -> usize {
219 self.buffer.len()
220 }
221
222 pub fn is_empty(&self) -> bool {
224 self.buffer.is_empty()
225 }
226}
227
228impl<K, R> ChangeBuffer<K, R>
229where
230 K: private::Key,
231 R: private::Row + Row,
232{
233 pub fn into_chunk(self, data_types: Vec<DataType>) -> Option<StreamChunk> {
235 let mut builder = StreamChunkBuilder::unlimited(data_types, Some(self.buffer.len()));
236 for record in self.into_records() {
237 let none = builder.append_record(record);
238 debug_assert!(none.is_none());
239 }
240 builder.take()
241 }
242
243 pub fn into_chunks(self, data_types: Vec<DataType>, chunk_size: usize) -> Vec<StreamChunk> {
245 let mut res = Vec::new();
246 let mut builder = StreamChunkBuilder::new(chunk_size, data_types);
247 for record in self.into_records() {
248 if let Some(chunk) = builder.append_record(record) {
249 res.push(chunk);
250 }
251 }
252 res.extend(builder.take());
253 res
254 }
255}