risingwave_stream/common/
change_buffer.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::LazyLock;
16
17use indexmap::IndexMap;
18use indexmap::map::Entry;
19use risingwave_common::array::stream_record::Record;
20use risingwave_common::array::{Op, StreamChunk, StreamChunkBuilder};
21use risingwave_common::log::LogSuppresser;
22use risingwave_common::row::Row;
23use risingwave_common::types::DataType;
24
25use crate::consistency::consistency_panic;
26
27/// Behavior when inconsistency is detected when aggregating changes to [`ChangeBuffer`].
28#[derive(Clone, Copy, Debug, PartialEq, Eq)]
29pub enum InconsistencyBehavior {
30    Panic,
31    Warn,
32    Tolerate,
33}
34
35impl InconsistencyBehavior {
36    /// Report an inconsistency.
37    #[track_caller]
38    pub fn report(self, msg: &str) {
39        match self {
40            InconsistencyBehavior::Panic => consistency_panic!("{}", msg),
41            InconsistencyBehavior::Warn => {
42                static LOG_SUPPERSSER: LazyLock<LogSuppresser> =
43                    LazyLock::new(LogSuppresser::default);
44
45                if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
46                    tracing::warn!(suppressed_count, "{}", msg);
47                }
48            }
49            InconsistencyBehavior::Tolerate => {}
50        }
51    }
52}
53
54mod private {
55    pub trait Key: Eq + std::hash::Hash {}
56    impl<K> Key for K where K: Eq + std::hash::Hash {}
57
58    pub trait Row: Eq + Default {}
59    impl<R> Row for R where R: Eq + Default {}
60}
61
62/// A buffer that accumulates changes and produce compacted changes.
63#[derive(Debug)]
64pub struct ChangeBuffer<K, R> {
65    // We use an `IndexMap` to preserve the original order of the changes as much as possible.
66    buffer: IndexMap<K, Record<R>>,
67    ib: InconsistencyBehavior,
68}
69
70impl<K, R> ChangeBuffer<K, R>
71where
72    K: private::Key,
73    R: private::Row,
74{
75    /// Apply an insertion of a row with the given key.
76    pub fn insert(&mut self, key: K, new_row: R) {
77        let entry = self.buffer.entry(key);
78        match entry {
79            Entry::Vacant(e) => {
80                e.insert(Record::Insert { new_row });
81            }
82            Entry::Occupied(mut e) => match e.get_mut() {
83                Record::Delete { old_row } => {
84                    let old_row = std::mem::take(old_row);
85                    e.insert(Record::Update { old_row, new_row });
86                }
87                Record::Insert { new_row: dst } => {
88                    self.ib.report("inconsistent changes: double-inserting");
89                    *dst = new_row;
90                }
91                Record::Update { new_row: dst, .. } => {
92                    self.ib.report("inconsistent changes: double-inserting");
93                    *dst = new_row;
94                }
95            },
96        }
97    }
98
99    /// Apply a deletion of a row with the given key.
100    pub fn delete(&mut self, key: K, old_row: R) {
101        let entry = self.buffer.entry(key);
102        match entry {
103            Entry::Vacant(e) => {
104                e.insert(Record::Delete { old_row });
105            }
106            Entry::Occupied(mut e) => match e.get_mut() {
107                Record::Insert { .. } => {
108                    // FIXME: though preserving the order well,
109                    // this is not performant compared to `swap_remove`
110                    e.shift_remove();
111                }
112                Record::Update { old_row, .. } => {
113                    let old_row = std::mem::take(old_row);
114                    e.insert(Record::Delete { old_row });
115                }
116                Record::Delete { old_row: dst } => {
117                    self.ib.report("inconsistent changes: double-deleting");
118                    *dst = old_row;
119                }
120            },
121        }
122    }
123
124    /// Apply an update of a row with the given key.
125    pub fn update(&mut self, key: K, old_row: R, new_row: R) {
126        let entry = self.buffer.entry(key);
127        match entry {
128            Entry::Vacant(e) => {
129                e.insert(Record::Update { old_row, new_row });
130            }
131            Entry::Occupied(mut e) => match e.get_mut() {
132                Record::Insert { .. } => {
133                    e.insert(Record::Insert { new_row });
134                }
135                Record::Update { new_row: dst, .. } => {
136                    *dst = new_row;
137                }
138                Record::Delete { .. } => {
139                    self.ib.report("inconsistent changes: update after delete");
140                    e.insert(Record::Update { old_row, new_row });
141                }
142            },
143        }
144    }
145
146    /// Apply a change record, with the key extracted by the given function.
147    ///
148    /// For `Record::Update`, inconsistency is reported if the old key and the new key are different.
149    /// Further behavior is determined by the `InconsistencyBehavior`.
150    pub fn apply_record(&mut self, record: Record<R>, key_fn: impl Fn(&R) -> K) {
151        match record {
152            Record::Insert { new_row } => self.insert(key_fn(&new_row), new_row),
153            Record::Delete { old_row } => self.delete(key_fn(&old_row), old_row),
154            Record::Update { old_row, new_row } => {
155                let old_key = key_fn(&old_row);
156                let new_key = key_fn(&new_row);
157
158                // As long as `ib` is not `Panic`, we still gracefully handle the mismatched key.
159                if old_key != new_key {
160                    self.ib
161                        .report("inconsistent changes: mismatched key in update");
162                    self.delete(old_key, old_row);
163                    self.insert(new_key, new_row);
164                } else {
165                    self.update(old_key, old_row, new_row);
166                }
167            }
168        }
169    }
170
171    /// Apply an `Op` of a row with the given key.
172    pub fn apply_op_row(&mut self, op: Op, key: K, row: R) {
173        match op {
174            Op::Insert | Op::UpdateInsert => self.insert(key, row),
175            Op::Delete | Op::UpdateDelete => self.delete(key, row),
176        }
177    }
178
179    /// Consume the buffer and produce a list of change records.
180    ///
181    /// No-op updates are filtered out.
182    pub fn into_records(self) -> impl Iterator<Item = Record<R>> {
183        self.buffer.into_values().filter(|record| match record {
184            Record::Insert { .. } => true,
185            Record::Delete { .. } => true,
186            Record::Update { old_row, new_row } => old_row != new_row,
187        })
188    }
189}
190
191impl<K, R> Default for ChangeBuffer<K, R> {
192    fn default() -> Self {
193        Self::new()
194    }
195}
196
197impl<K, R> ChangeBuffer<K, R> {
198    /// Create a new `ChangeBuffer` that panics on inconsistency.
199    pub fn new() -> Self {
200        Self::with_capacity(0)
201    }
202
203    /// Create a new `ChangeBuffer` with the given capacity that panics on inconsistency.
204    pub fn with_capacity(capacity: usize) -> Self {
205        Self {
206            buffer: IndexMap::with_capacity(capacity),
207            ib: InconsistencyBehavior::Panic,
208        }
209    }
210
211    /// Set the inconsistency behavior.
212    pub fn with_inconsistency_behavior(mut self, ib: InconsistencyBehavior) -> Self {
213        self.ib = ib;
214        self
215    }
216
217    /// Get the number of keys that have pending changes in the buffer.
218    pub fn len(&self) -> usize {
219        self.buffer.len()
220    }
221
222    /// Check if the buffer is empty.
223    pub fn is_empty(&self) -> bool {
224        self.buffer.is_empty()
225    }
226}
227
228impl<K, R> ChangeBuffer<K, R>
229where
230    K: private::Key,
231    R: private::Row + Row,
232{
233    /// Consume the buffer and produce a single compacted chunk.
234    pub fn into_chunk(self, data_types: Vec<DataType>) -> Option<StreamChunk> {
235        let mut builder = StreamChunkBuilder::unlimited(data_types, Some(self.buffer.len()));
236        for record in self.into_records() {
237            let none = builder.append_record(record);
238            debug_assert!(none.is_none());
239        }
240        builder.take()
241    }
242
243    /// Consume the buffer and produce a list of compacted chunks with the given size at most.
244    pub fn into_chunks(self, data_types: Vec<DataType>, chunk_size: usize) -> Vec<StreamChunk> {
245        let mut res = Vec::new();
246        let mut builder = StreamChunkBuilder::new(chunk_size, data_types);
247        for record in self.into_records() {
248            if let Some(chunk) = builder.append_record(record) {
249                res.push(chunk);
250            }
251        }
252        res.extend(builder.take());
253        res
254    }
255}