risingwave_stream/common/
compact_chunk.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::hash::BuildHasherDefault;
18use std::mem;
19use std::sync::LazyLock;
20
21use itertools::Itertools;
22use prehash::{Passthru, Prehashed, new_prehashed_map_with_capacity};
23use risingwave_common::array::stream_chunk::{OpRowMutRef, StreamChunkMut};
24use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder;
25use risingwave_common::array::stream_record::Record;
26use risingwave_common::array::{Op, RowRef, StreamChunk};
27use risingwave_common::log::LogSuppresser;
28use risingwave_common::row::{Project, RowExt};
29use risingwave_common::types::DataType;
30use risingwave_common::util::hash_util::Crc32FastBuilder;
31
32use crate::consistency::consistency_panic;
33
34// XXX(bugen): This utility seems confusing. It's doing different things with different methods,
35// while all of them are named "compact" (also note `StreamChunk::compact`). We should consider
36// refactoring it.
37//
38// Basically,
39// - `StreamChunk::compact`: construct a new chunk by removing invisible rows.
40// - `StreamChunkCompactor::into_compacted_chunks`: hide intermediate operations of the same key
41//   by modifying the visibility, while preserving the original chunk structure.
42// - `StreamChunkCompactor::reconstructed_compacted_chunks`: filter out intermediate operations
43//   of the same key, construct new chunks. A combination of `into_compacted_chunks`, `compact`,
44//   and `StreamChunkBuilder`.
45
46/// Behavior when inconsistency is detected during compaction.
47#[derive(Clone, Copy, Debug, PartialEq, Eq)]
48pub enum InconsistencyBehavior {
49    Panic,
50    Warn,
51    Tolerate,
52}
53
54impl InconsistencyBehavior {
55    /// Report an inconsistency.
56    #[track_caller]
57    fn report(self, msg: &str) {
58        match self {
59            InconsistencyBehavior::Panic => consistency_panic!("{}", msg),
60            InconsistencyBehavior::Warn => {
61                static LOG_SUPPERSSER: LazyLock<LogSuppresser> =
62                    LazyLock::new(LogSuppresser::default);
63
64                if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
65                    tracing::warn!(suppressed_count, "{}", msg);
66                }
67            }
68            InconsistencyBehavior::Tolerate => {}
69        }
70    }
71}
72
73/// A helper to compact the stream chunks by modifying the `Ops` and visibility of the chunk.
74pub struct StreamChunkCompactor {
75    chunks: Vec<StreamChunk>,
76    key: Vec<usize>,
77}
78
79struct OpRowMutRefTuple<'a> {
80    before_prev: Option<OpRowMutRef<'a>>,
81    prev: OpRowMutRef<'a>,
82}
83
84impl<'a> OpRowMutRefTuple<'a> {
85    /// return true if no row left
86    fn push(&mut self, mut curr: OpRowMutRef<'a>, ib: InconsistencyBehavior) -> bool {
87        debug_assert!(self.prev.vis());
88        match (self.prev.op(), curr.op()) {
89            (Op::Insert, Op::Insert) => {
90                ib.report("receive duplicated insert on the stream");
91                // If need to tolerate inconsistency, override the previous insert.
92                // Note that because the primary key constraint has been violated, we
93                // don't mind losing some data here.
94                self.prev.set_vis(false);
95                self.prev = curr;
96            }
97            (Op::Delete, Op::Delete) => {
98                ib.report("receive duplicated delete on the stream");
99                // If need to tolerate inconsistency, override the previous delete.
100                // Note that because the primary key constraint has been violated, we
101                // don't mind losing some data here.
102                self.prev.set_vis(false);
103                self.prev = curr;
104            }
105            (Op::Insert, Op::Delete) => {
106                // Delete a row that has been inserted, just hide the two ops.
107                self.prev.set_vis(false);
108                curr.set_vis(false);
109                self.prev = if let Some(prev) = self.before_prev.take() {
110                    prev
111                } else {
112                    return true;
113                }
114            }
115            (Op::Delete, Op::Insert) => {
116                // The operation for the key must be (+, -, +) or (-, +). And the (+, -) must has
117                // been filtered.
118                debug_assert!(
119                    self.before_prev.is_none(),
120                    "should have been taken in the above match arm"
121                );
122                self.before_prev = Some(mem::replace(&mut self.prev, curr));
123            }
124            // `all the updateDelete` and `updateInsert` should be normalized to `delete`
125            // and`insert`
126            _ => unreachable!(),
127        };
128        false
129    }
130
131    fn as_update_op(&mut self) -> Option<(&mut OpRowMutRef<'a>, &mut OpRowMutRef<'a>)> {
132        self.before_prev.as_mut().map(|prev| {
133            debug_assert_eq!(prev.op(), Op::Delete);
134            debug_assert_eq!(self.prev.op(), Op::Insert);
135            (prev, &mut self.prev)
136        })
137    }
138}
139
140type OpRowMap<'a, 'b> =
141    HashMap<Prehashed<Project<'b, RowRef<'a>>>, OpRowMutRefTuple<'a>, BuildHasherDefault<Passthru>>;
142
143#[derive(Clone, Debug)]
144pub enum RowOp<'a> {
145    Insert(RowRef<'a>),
146    Delete(RowRef<'a>),
147    /// (`old_value`, `new_value`)
148    Update((RowRef<'a>, RowRef<'a>)),
149}
150
151pub struct RowOpMap<'a, 'b> {
152    map: HashMap<Prehashed<Project<'b, RowRef<'a>>>, RowOp<'a>, BuildHasherDefault<Passthru>>,
153    ib: InconsistencyBehavior,
154}
155
156impl<'a, 'b> RowOpMap<'a, 'b> {
157    fn with_capacity(estimate_size: usize, ib: InconsistencyBehavior) -> Self {
158        Self {
159            map: new_prehashed_map_with_capacity(estimate_size),
160            ib,
161        }
162    }
163
164    pub fn insert(&mut self, k: Prehashed<Project<'b, RowRef<'a>>>, v: RowRef<'a>) {
165        let entry = self.map.entry(k);
166        match entry {
167            Entry::Vacant(e) => {
168                e.insert(RowOp::Insert(v));
169            }
170            Entry::Occupied(mut e) => match e.get() {
171                RowOp::Delete(old_v) => {
172                    e.insert(RowOp::Update((*old_v, v)));
173                }
174                RowOp::Insert(_) => {
175                    self.ib
176                        .report("double insert for the same pk, breaking the sink's pk constraint");
177                    e.insert(RowOp::Insert(v));
178                }
179                RowOp::Update((old_v, _)) => {
180                    self.ib
181                        .report("double insert for the same pk, breaking the sink's pk constraint");
182                    e.insert(RowOp::Update((*old_v, v)));
183                }
184            },
185        }
186    }
187
188    pub fn delete(&mut self, k: Prehashed<Project<'b, RowRef<'a>>>, v: RowRef<'a>) {
189        let entry = self.map.entry(k);
190        match entry {
191            Entry::Vacant(e) => {
192                e.insert(RowOp::Delete(v));
193            }
194            Entry::Occupied(mut e) => match e.get() {
195                RowOp::Insert(_) => {
196                    e.remove();
197                }
198                RowOp::Update((prev, _)) => {
199                    e.insert(RowOp::Delete(*prev));
200                }
201                RowOp::Delete(_) => {
202                    self.ib.report("double delete for the same pk");
203                    e.insert(RowOp::Delete(v));
204                }
205            },
206        }
207    }
208
209    pub fn into_chunks(self, chunk_size: usize, data_types: Vec<DataType>) -> Vec<StreamChunk> {
210        let mut ret = vec![];
211        let mut builder = StreamChunkBuilder::new(chunk_size, data_types);
212        for (_, row_op) in self.map {
213            match row_op {
214                RowOp::Insert(row) => {
215                    if let Some(c) = builder.append_record(Record::Insert { new_row: row }) {
216                        ret.push(c)
217                    }
218                }
219                RowOp::Delete(row) => {
220                    if let Some(c) = builder.append_record(Record::Delete { old_row: row }) {
221                        ret.push(c)
222                    }
223                }
224                RowOp::Update((old, new)) => {
225                    if old == new {
226                        continue;
227                    }
228                    if let Some(c) = builder.append_record(Record::Update {
229                        old_row: old,
230                        new_row: new,
231                    }) {
232                        ret.push(c)
233                    }
234                }
235            }
236        }
237        if let Some(c) = builder.take() {
238            ret.push(c);
239        }
240        ret
241    }
242}
243
244impl StreamChunkCompactor {
245    pub fn new(key: Vec<usize>, chunks: Vec<StreamChunk>) -> Self {
246        Self { chunks, key }
247    }
248
249    pub fn into_inner(self) -> (Vec<StreamChunk>, Vec<usize>) {
250        (self.chunks, self.key)
251    }
252
253    /// Compact a chunk by modifying the ops and the visibility of a stream chunk.
254    /// Currently, two transformation will be applied
255    /// - remove intermediate operation of the same key. The operations of the same stream key will only
256    ///   have three kind of patterns Insert, Delete or Update.
257    /// - For the update (-old row, +old row), when old row is exactly same. The two rowOp will be
258    ///   removed.
259    pub fn into_compacted_chunks(
260        self,
261        ib: InconsistencyBehavior,
262    ) -> impl Iterator<Item = StreamChunk> {
263        let (chunks, key_indices) = self.into_inner();
264
265        let estimate_size = chunks.iter().map(|c| c.cardinality()).sum();
266        let mut chunks: Vec<(Vec<u64>, StreamChunkMut)> = chunks
267            .into_iter()
268            .map(|c| {
269                let hash_values = c
270                    .data_chunk()
271                    .get_hash_values(&key_indices, Crc32FastBuilder)
272                    .into_iter()
273                    .map(|hash| hash.value())
274                    .collect_vec();
275                (hash_values, StreamChunkMut::from(c))
276            })
277            .collect_vec();
278
279        let mut op_row_map: OpRowMap<'_, '_> = new_prehashed_map_with_capacity(estimate_size);
280        for (hash_values, c) in &mut chunks {
281            for (row, mut op_row) in c.to_rows_mut() {
282                op_row.set_op(op_row.op().normalize_update());
283                let hash = hash_values[row.index()];
284                let key = row.project(&key_indices);
285                match op_row_map.entry(Prehashed::new(key, hash)) {
286                    Entry::Vacant(v) => {
287                        v.insert(OpRowMutRefTuple {
288                            before_prev: None,
289                            prev: op_row,
290                        });
291                    }
292                    Entry::Occupied(mut o) => {
293                        if o.get_mut().push(op_row, ib) {
294                            o.remove_entry();
295                        }
296                    }
297                }
298            }
299        }
300        for tuple in op_row_map.values_mut() {
301            if let Some((prev, latest)) = tuple.as_update_op() {
302                if prev.row_ref() == latest.row_ref() {
303                    prev.set_vis(false);
304                    latest.set_vis(false);
305                } else if prev.same_chunk(latest) && prev.index() + 1 == latest.index() {
306                    // TODO(st1page): use next_one check in bitmap
307                    prev.set_op(Op::UpdateDelete);
308                    latest.set_op(Op::UpdateInsert);
309                }
310            }
311        }
312        chunks.into_iter().map(|(_, c)| c.into())
313    }
314
315    /// re-construct the stream chunks to compact them with the key.
316    pub fn reconstructed_compacted_chunks(
317        self,
318        chunk_size: usize,
319        data_types: Vec<DataType>,
320        ib: InconsistencyBehavior,
321    ) -> Vec<StreamChunk> {
322        let (chunks, key_indices) = self.into_inner();
323
324        let estimate_size = chunks.iter().map(|c| c.cardinality()).sum();
325        let chunks: Vec<(_, _, _)> = chunks
326            .into_iter()
327            .map(|c| {
328                let (c, ops) = c.into_parts();
329                let hash_values = c
330                    .get_hash_values(&key_indices, Crc32FastBuilder)
331                    .into_iter()
332                    .map(|hash| hash.value())
333                    .collect_vec();
334                (hash_values, ops, c)
335            })
336            .collect_vec();
337        let mut map = RowOpMap::with_capacity(estimate_size, ib);
338        for (hash_values, ops, c) in &chunks {
339            for row in c.rows() {
340                let hash = hash_values[row.index()];
341                let op = ops[row.index()];
342                let key = row.project(&key_indices);
343                let k = Prehashed::new(key, hash);
344                match op {
345                    Op::Insert | Op::UpdateInsert => map.insert(k, row),
346                    Op::Delete | Op::UpdateDelete => map.delete(k, row),
347                }
348            }
349        }
350        map.into_chunks(chunk_size, data_types)
351    }
352}
353
354pub fn merge_chunk_row(
355    stream_chunk: StreamChunk,
356    pk_indices: &[usize],
357    ib: InconsistencyBehavior,
358) -> StreamChunk {
359    let compactor = StreamChunkCompactor::new(pk_indices.to_vec(), vec![stream_chunk]);
360    compactor.into_compacted_chunks(ib).next().unwrap()
361}
362
363#[cfg(test)]
364mod tests {
365    use risingwave_common::test_prelude::StreamChunkTestExt;
366
367    use super::*;
368
369    #[test]
370    fn test_merge_chunk_row() {
371        let pk_indices = [0, 1];
372        let chunks = vec![
373            StreamChunk::from_pretty(
374                " I I I
375                - 1 1 1
376                + 1 1 2
377                + 2 5 7
378                + 4 9 2
379                - 2 5 7
380                + 2 5 5
381                - 6 6 9
382                + 6 6 9
383                - 9 9 1",
384            ),
385            StreamChunk::from_pretty(
386                " I I I
387                - 6 6 9
388                + 9 9 9
389                - 9 9 4
390                + 2 2 2
391                + 9 9 1",
392            ),
393        ];
394        let compactor = StreamChunkCompactor::new(pk_indices.to_vec(), chunks);
395        let mut iter = compactor.into_compacted_chunks(InconsistencyBehavior::Panic);
396        assert_eq!(
397            iter.next().unwrap().compact(),
398            StreamChunk::from_pretty(
399                " I I I
400                U- 1 1 1
401                U+ 1 1 2
402                + 4 9 2
403                + 2 5 5
404                - 6 6 9",
405            )
406        );
407        assert_eq!(
408            iter.next().unwrap().compact(),
409            StreamChunk::from_pretty(
410                " I I I
411                + 2 2 2",
412            )
413        );
414
415        assert_eq!(iter.next(), None);
416    }
417
418    #[test]
419    fn test_compact_chunk_row() {
420        let pk_indices = [0, 1];
421        let chunks = vec![
422            StreamChunk::from_pretty(
423                " I I I
424            - 1 1 1
425            + 1 1 2
426            + 2 5 7
427            + 4 9 2
428            - 2 5 7
429            + 2 5 5
430            - 6 6 9
431            + 6 6 9
432            - 9 9 1",
433            ),
434            StreamChunk::from_pretty(
435                " I I I
436            - 6 6 9
437            + 9 9 9
438            - 9 9 4
439            + 2 2 2
440            + 9 9 1",
441            ),
442        ];
443        let compactor = StreamChunkCompactor::new(pk_indices.to_vec(), chunks);
444
445        let chunks = compactor.reconstructed_compacted_chunks(
446            100,
447            vec![DataType::Int64, DataType::Int64, DataType::Int64],
448            InconsistencyBehavior::Panic,
449        );
450        assert_eq!(
451            chunks.into_iter().next().unwrap(),
452            StreamChunk::from_pretty(
453                " I I I
454                 + 2 5 5
455                 - 6 6 9
456                 + 4 9 2
457                U- 1 1 1
458                U+ 1 1 2
459                 + 2 2 2",
460            )
461        );
462    }
463}