risingwave_stream/common/
compact_chunk.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::hash::BuildHasherDefault;
18use std::mem;
19use std::sync::LazyLock;
20
21use itertools::Itertools;
22use prehash::{Passthru, Prehashed, new_prehashed_map_with_capacity};
23use risingwave_common::array::stream_chunk::{OpRowMutRef, StreamChunkMut};
24use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder;
25use risingwave_common::array::stream_record::Record;
26use risingwave_common::array::{Op, RowRef, StreamChunk};
27use risingwave_common::log::LogSuppresser;
28use risingwave_common::row::{Project, RowExt};
29use risingwave_common::types::DataType;
30use risingwave_common::util::hash_util::Crc32FastBuilder;
31
32use crate::consistency::consistency_panic;
33
34/// A helper to compact the stream chunks by modifying the `Ops` and visibility of the chunk.
35pub struct StreamChunkCompactor {
36    chunks: Vec<StreamChunk>,
37    key: Vec<usize>,
38}
39
40struct OpRowMutRefTuple<'a> {
41    before_prev: Option<OpRowMutRef<'a>>,
42    prev: OpRowMutRef<'a>,
43}
44
45impl<'a> OpRowMutRefTuple<'a> {
46    /// return true if no row left
47    fn push(&mut self, mut curr: OpRowMutRef<'a>) -> bool {
48        debug_assert!(self.prev.vis());
49        match (self.prev.op(), curr.op()) {
50            (Op::Insert, Op::Insert) => {
51                consistency_panic!("receive duplicated insert on the stream");
52                // If need to tolerate inconsistency, override the previous insert.
53                // Note that because the primary key constraint has been violated, we
54                // don't mind losing some data here.
55                self.prev.set_vis(false);
56                self.prev = curr;
57            }
58            (Op::Delete, Op::Delete) => {
59                consistency_panic!("receive duplicated delete on the stream");
60                // If need to tolerate inconsistency, override the previous delete.
61                // Note that because the primary key constraint has been violated, we
62                // don't mind losing some data here.
63                self.prev.set_vis(false);
64                self.prev = curr;
65            }
66            (Op::Insert, Op::Delete) => {
67                // Delete a row that has been inserted, just hide the two ops.
68                self.prev.set_vis(false);
69                curr.set_vis(false);
70                self.prev = if let Some(prev) = self.before_prev.take() {
71                    prev
72                } else {
73                    return true;
74                }
75            }
76            (Op::Delete, Op::Insert) => {
77                // The operation for the key must be (+, -, +) or (-, +). And the (+, -) must has
78                // been filtered.
79                debug_assert!(
80                    self.before_prev.is_none(),
81                    "should have been taken in the above match arm"
82                );
83                self.before_prev = Some(mem::replace(&mut self.prev, curr));
84            }
85            // `all the updateDelete` and `updateInsert` should be normalized to `delete`
86            // and`insert`
87            _ => unreachable!(),
88        };
89        false
90    }
91
92    fn as_update_op(&mut self) -> Option<(&mut OpRowMutRef<'a>, &mut OpRowMutRef<'a>)> {
93        self.before_prev.as_mut().map(|prev| {
94            debug_assert_eq!(prev.op(), Op::Delete);
95            debug_assert_eq!(self.prev.op(), Op::Insert);
96            (prev, &mut self.prev)
97        })
98    }
99}
100
101type OpRowMap<'a, 'b> =
102    HashMap<Prehashed<Project<'b, RowRef<'a>>>, OpRowMutRefTuple<'a>, BuildHasherDefault<Passthru>>;
103
104#[derive(Clone, Debug)]
105pub enum RowOp<'a> {
106    Insert(RowRef<'a>),
107    Delete(RowRef<'a>),
108    /// (`old_value`, `new_value`)
109    Update((RowRef<'a>, RowRef<'a>)),
110}
111static LOG_SUPPERSSER: LazyLock<LogSuppresser> = LazyLock::new(LogSuppresser::default);
112
113pub struct RowOpMap<'a, 'b> {
114    map: HashMap<Prehashed<Project<'b, RowRef<'a>>>, RowOp<'a>, BuildHasherDefault<Passthru>>,
115    warn_for_inconsistent_stream: bool,
116}
117
118impl<'a, 'b> RowOpMap<'a, 'b> {
119    fn with_capacity(estimate_size: usize, warn_for_inconsistent_stream: bool) -> Self {
120        Self {
121            map: new_prehashed_map_with_capacity(estimate_size),
122            warn_for_inconsistent_stream,
123        }
124    }
125
126    pub fn insert(&mut self, k: Prehashed<Project<'b, RowRef<'a>>>, v: RowRef<'a>) {
127        let entry = self.map.entry(k);
128        match entry {
129            Entry::Vacant(e) => {
130                e.insert(RowOp::Insert(v));
131            }
132            Entry::Occupied(mut e) => match e.get() {
133                RowOp::Delete(old_v) => {
134                    e.insert(RowOp::Update((*old_v, v)));
135                }
136                RowOp::Insert(_) => {
137                    if self.warn_for_inconsistent_stream {
138                        if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
139                            tracing::warn!(
140                                suppressed_count,
141                                "double insert for the same pk, breaking the sink's pk constraint"
142                            );
143                        }
144                    }
145                    e.insert(RowOp::Insert(v));
146                }
147                RowOp::Update((old_v, _)) => {
148                    if self.warn_for_inconsistent_stream {
149                        if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
150                            tracing::warn!(
151                                suppressed_count,
152                                "double insert for the same pk, breaking the sink's pk constraint"
153                            );
154                        }
155                    }
156                    e.insert(RowOp::Update((*old_v, v)));
157                }
158            },
159        }
160    }
161
162    pub fn delete(&mut self, k: Prehashed<Project<'b, RowRef<'a>>>, v: RowRef<'a>) {
163        let entry = self.map.entry(k);
164        match entry {
165            Entry::Vacant(e) => {
166                e.insert(RowOp::Delete(v));
167            }
168            Entry::Occupied(mut e) => match e.get() {
169                RowOp::Insert(_) => {
170                    e.remove();
171                }
172                RowOp::Update((prev, _)) => {
173                    e.insert(RowOp::Delete(*prev));
174                }
175                RowOp::Delete(_) => {
176                    if self.warn_for_inconsistent_stream {
177                        if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
178                            tracing::warn!(suppressed_count, "double delete for the same pk");
179                        }
180                    }
181                    e.insert(RowOp::Delete(v));
182                }
183            },
184        }
185    }
186
187    pub fn into_chunks(self, chunk_size: usize, data_types: Vec<DataType>) -> Vec<StreamChunk> {
188        let mut ret = vec![];
189        let mut builder = StreamChunkBuilder::new(chunk_size, data_types);
190        for (_, row_op) in self.map {
191            match row_op {
192                RowOp::Insert(row) => {
193                    if let Some(c) = builder.append_record(Record::Insert { new_row: row }) {
194                        ret.push(c)
195                    }
196                }
197                RowOp::Delete(row) => {
198                    if let Some(c) = builder.append_record(Record::Delete { old_row: row }) {
199                        ret.push(c)
200                    }
201                }
202                RowOp::Update((old, new)) => {
203                    if old == new {
204                        continue;
205                    }
206                    if let Some(c) = builder.append_record(Record::Update {
207                        old_row: old,
208                        new_row: new,
209                    }) {
210                        ret.push(c)
211                    }
212                }
213            }
214        }
215        if let Some(c) = builder.take() {
216            ret.push(c);
217        }
218        ret
219    }
220}
221
222impl StreamChunkCompactor {
223    pub fn new(key: Vec<usize>, chunks: Vec<StreamChunk>) -> Self {
224        Self { chunks, key }
225    }
226
227    pub fn into_inner(self) -> (Vec<StreamChunk>, Vec<usize>) {
228        (self.chunks, self.key)
229    }
230
231    /// Compact a chunk by modifying the ops and the visibility of a stream chunk.
232    /// Currently, two transformation will be applied
233    /// - remove intermediate operation of the same key. The operations of the same stream key will only
234    ///   have three kind of patterns Insert, Delete or Update.
235    /// - For the update (-old row, +old row), when old row is exactly same. The two rowOp will be
236    ///   removed.
237    pub fn into_compacted_chunks(self) -> impl Iterator<Item = StreamChunk> {
238        let (chunks, key_indices) = self.into_inner();
239
240        let estimate_size = chunks.iter().map(|c| c.cardinality()).sum();
241        let mut chunks: Vec<(Vec<u64>, StreamChunkMut)> = chunks
242            .into_iter()
243            .map(|c| {
244                let hash_values = c
245                    .data_chunk()
246                    .get_hash_values(&key_indices, Crc32FastBuilder)
247                    .into_iter()
248                    .map(|hash| hash.value())
249                    .collect_vec();
250                (hash_values, StreamChunkMut::from(c))
251            })
252            .collect_vec();
253
254        let mut op_row_map: OpRowMap<'_, '_> = new_prehashed_map_with_capacity(estimate_size);
255        for (hash_values, c) in &mut chunks {
256            for (row, mut op_row) in c.to_rows_mut() {
257                op_row.set_op(op_row.op().normalize_update());
258                let hash = hash_values[row.index()];
259                let key = row.project(&key_indices);
260                match op_row_map.entry(Prehashed::new(key, hash)) {
261                    Entry::Vacant(v) => {
262                        v.insert(OpRowMutRefTuple {
263                            before_prev: None,
264                            prev: op_row,
265                        });
266                    }
267                    Entry::Occupied(mut o) => {
268                        if o.get_mut().push(op_row) {
269                            o.remove_entry();
270                        }
271                    }
272                }
273            }
274        }
275        for tuple in op_row_map.values_mut() {
276            if let Some((prev, latest)) = tuple.as_update_op() {
277                if prev.row_ref() == latest.row_ref() {
278                    prev.set_vis(false);
279                    latest.set_vis(false);
280                } else if prev.same_chunk(latest) && prev.index() + 1 == latest.index() {
281                    // TODO(st1page): use next_one check in bitmap
282                    prev.set_op(Op::UpdateDelete);
283                    latest.set_op(Op::UpdateInsert);
284                }
285            }
286        }
287        chunks.into_iter().map(|(_, c)| c.into())
288    }
289
290    /// re-construct the stream chunks to compact them with the key.
291    pub fn reconstructed_compacted_chunks(
292        self,
293        chunk_size: usize,
294        data_types: Vec<DataType>,
295        warn_for_inconsistent_stream: bool,
296    ) -> Vec<StreamChunk> {
297        let (chunks, key_indices) = self.into_inner();
298
299        let estimate_size = chunks.iter().map(|c| c.cardinality()).sum();
300        let chunks: Vec<(_, _, _)> = chunks
301            .into_iter()
302            .map(|c| {
303                let (c, ops) = c.into_parts();
304                let hash_values = c
305                    .get_hash_values(&key_indices, Crc32FastBuilder)
306                    .into_iter()
307                    .map(|hash| hash.value())
308                    .collect_vec();
309                (hash_values, ops, c)
310            })
311            .collect_vec();
312        let mut map = RowOpMap::with_capacity(estimate_size, warn_for_inconsistent_stream);
313        for (hash_values, ops, c) in &chunks {
314            for row in c.rows() {
315                let hash = hash_values[row.index()];
316                let op = ops[row.index()];
317                let key = row.project(&key_indices);
318                let k = Prehashed::new(key, hash);
319                match op {
320                    Op::Insert | Op::UpdateInsert => map.insert(k, row),
321                    Op::Delete | Op::UpdateDelete => map.delete(k, row),
322                }
323            }
324        }
325        map.into_chunks(chunk_size, data_types)
326    }
327}
328
329pub fn merge_chunk_row(stream_chunk: StreamChunk, pk_indices: &[usize]) -> StreamChunk {
330    let compactor = StreamChunkCompactor::new(pk_indices.to_vec(), vec![stream_chunk]);
331    compactor.into_compacted_chunks().next().unwrap()
332}
333
334#[cfg(test)]
335mod tests {
336    use risingwave_common::test_prelude::StreamChunkTestExt;
337
338    use super::*;
339
340    #[test]
341    fn test_merge_chunk_row() {
342        let pk_indices = [0, 1];
343        let chunks = vec![
344            StreamChunk::from_pretty(
345                " I I I
346                - 1 1 1
347                + 1 1 2
348                + 2 5 7
349                + 4 9 2
350                - 2 5 7
351                + 2 5 5
352                - 6 6 9
353                + 6 6 9
354                - 9 9 1",
355            ),
356            StreamChunk::from_pretty(
357                " I I I
358                - 6 6 9
359                + 9 9 9
360                - 9 9 4
361                + 2 2 2
362                + 9 9 1",
363            ),
364        ];
365        let compactor = StreamChunkCompactor::new(pk_indices.to_vec(), chunks);
366        let mut iter = compactor.into_compacted_chunks();
367        assert_eq!(
368            iter.next().unwrap().compact(),
369            StreamChunk::from_pretty(
370                " I I I
371                U- 1 1 1
372                U+ 1 1 2
373                + 4 9 2
374                + 2 5 5
375                - 6 6 9",
376            )
377        );
378        assert_eq!(
379            iter.next().unwrap().compact(),
380            StreamChunk::from_pretty(
381                " I I I
382                + 2 2 2",
383            )
384        );
385
386        assert_eq!(iter.next(), None);
387    }
388
389    #[test]
390    fn test_compact_chunk_row() {
391        let pk_indices = [0, 1];
392        let chunks = vec![
393            StreamChunk::from_pretty(
394                " I I I
395            - 1 1 1
396            + 1 1 2
397            + 2 5 7
398            + 4 9 2
399            - 2 5 7
400            + 2 5 5
401            - 6 6 9
402            + 6 6 9
403            - 9 9 1",
404            ),
405            StreamChunk::from_pretty(
406                " I I I
407            - 6 6 9
408            + 9 9 9
409            - 9 9 4
410            + 2 2 2
411            + 9 9 1",
412            ),
413        ];
414        let compactor = StreamChunkCompactor::new(pk_indices.to_vec(), chunks);
415
416        let chunks = compactor.reconstructed_compacted_chunks(
417            100,
418            vec![DataType::Int64, DataType::Int64, DataType::Int64],
419            true,
420        );
421        assert_eq!(
422            chunks.into_iter().next().unwrap(),
423            StreamChunk::from_pretty(
424                " I I I
425                 + 2 5 5
426                 - 6 6 9
427                 + 4 9 2
428                U- 1 1 1
429                U+ 1 1 2
430                 + 2 2 2",
431            )
432        );
433    }
434}