risingwave_stream/common/
compact_chunk.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_common::array::stream_chunk::StreamChunkMut;
17use risingwave_common::array::stream_record::Record;
18use risingwave_common::array::{Op, StreamChunk};
19use risingwave_common::row::RowExt;
20use risingwave_common::types::DataType;
21
22pub use super::change_buffer::InconsistencyBehavior;
23use crate::common::change_buffer::output_kind::{RETRACT, UPSERT};
24use crate::common::change_buffer::{ChangeBuffer, OutputKind};
25
26/// A helper to remove unnecessary changes in the stream chunks based on the key.
27pub struct StreamChunkCompactor {
28    chunks: Vec<StreamChunk>,
29    key: Vec<usize>,
30}
31
32impl StreamChunkCompactor {
33    pub fn new(key: Vec<usize>, chunks: Vec<StreamChunk>) -> Self {
34        Self { chunks, key }
35    }
36
37    pub fn into_inner(self) -> (Vec<StreamChunk>, Vec<usize>) {
38        (self.chunks, self.key)
39    }
40
41    /// Remove unnecessary changes in the given chunks, by modifying the visibility and ops in place.
42    pub fn into_compacted_chunks_inline<const KIND: OutputKind>(
43        self,
44        ib: InconsistencyBehavior,
45    ) -> Vec<StreamChunk> {
46        let (chunks, key_indices) = self.into_inner();
47
48        let estimate_size = chunks.iter().map(|c| c.cardinality()).sum();
49        let mut cb = ChangeBuffer::with_capacity(estimate_size).with_inconsistency_behavior(ib);
50
51        let mut chunks = chunks.into_iter().map(StreamChunkMut::from).collect_vec();
52        for chunk in &mut chunks {
53            for (row, mut op_row) in chunk.to_rows_mut() {
54                let op = op_row.op().normalize_update();
55                let key = row.project(&key_indices);
56                // Make all rows invisible first.
57                op_row.set_vis(false);
58                op_row.set_op(op);
59                cb.apply_op_row(op, key, op_row);
60            }
61        }
62
63        // For the rows that survive compaction, make them visible.
64        for record in cb.into_records() {
65            match record {
66                Record::Insert { mut new_row } => new_row.set_vis(true),
67                Record::Delete { mut old_row } => old_row.set_vis(true),
68                Record::Update {
69                    mut old_row,
70                    mut new_row,
71                } => {
72                    match KIND {
73                        // For upsert output, we only keep the new row with normalized op (`Insert`).
74                        UPSERT => new_row.set_vis(true),
75                        // For retract output, we keep both the old and new row, and rewrite the ops
76                        // to `U-` and `U+` if they are adjacent.
77                        RETRACT => {
78                            old_row.set_vis(true);
79                            new_row.set_vis(true);
80                            if old_row.same_chunk(&new_row)
81                                && old_row.index() + 1 == new_row.index()
82                            {
83                                old_row.set_op(Op::UpdateDelete);
84                                new_row.set_op(Op::UpdateInsert);
85                            }
86                        }
87                    }
88                }
89            }
90        }
91
92        chunks.into_iter().map(|c| c.into()).collect()
93    }
94
95    /// Remove unnecessary changes in the given chunks, by filtering them out and constructing new
96    /// chunks, with the given chunk size.
97    pub fn into_compacted_chunks_reconstructed<const KIND: OutputKind>(
98        self,
99        chunk_size: usize,
100        data_types: Vec<DataType>,
101        ib: InconsistencyBehavior,
102    ) -> Vec<StreamChunk> {
103        let (chunks, key_indices) = self.into_inner();
104
105        let estimate_size = chunks.iter().map(|c| c.cardinality()).sum();
106        let mut cb = ChangeBuffer::with_capacity(estimate_size).with_inconsistency_behavior(ib);
107
108        for chunk in &chunks {
109            for record in chunk.records() {
110                cb.apply_record(record, |&row| row.project(&key_indices));
111            }
112        }
113
114        cb.into_chunks::<KIND>(data_types, chunk_size)
115    }
116}
117
118/// Remove unnecessary changes in the given chunk, by modifying the visibility and ops in place.
119///
120/// This is the same as [`StreamChunkCompactor::into_compacted_chunks_inline`] with only one chunk.
121pub fn compact_chunk_inline<const KIND: OutputKind>(
122    stream_chunk: StreamChunk,
123    key_indices: &[usize],
124    ib: InconsistencyBehavior,
125) -> StreamChunk {
126    StreamChunkCompactor::new(key_indices.to_vec(), vec![stream_chunk])
127        .into_compacted_chunks_inline::<KIND>(ib)
128        .into_iter()
129        .exactly_one()
130        .unwrap_or_else(|_| unreachable!("should have exactly one chunk in the output"))
131}
132
133#[cfg(test)]
134mod tests {
135    use risingwave_common::test_prelude::StreamChunkTestExt;
136
137    use super::*;
138
139    #[test]
140    fn test_compact_chunk_inline_upsert() {
141        test_compact_chunk_inline::<UPSERT>();
142    }
143
144    #[test]
145    fn test_compact_chunk_inline_retract() {
146        test_compact_chunk_inline::<RETRACT>();
147    }
148
149    fn test_compact_chunk_inline<const KIND: OutputKind>() {
150        let key = [0, 1];
151        let chunks = vec![
152            StreamChunk::from_pretty(
153                " I I I
154                - 1 1 1
155                + 1 1 2
156                + 2 5 7
157                + 4 9 2
158                - 2 5 7
159                + 2 5 5
160                - 6 6 9
161                + 6 6 9
162                - 9 9 1",
163            ),
164            StreamChunk::from_pretty(
165                " I I I
166                - 6 6 9
167                + 9 9 9
168                - 9 9 4
169                + 2 2 2
170                + 9 9 1",
171            ),
172        ];
173        let compactor = StreamChunkCompactor::new(key.to_vec(), chunks);
174        let mut iter = compactor
175            .into_compacted_chunks_inline::<KIND>(InconsistencyBehavior::Panic)
176            .into_iter();
177
178        let chunk = iter.next().unwrap().compact_vis();
179        let expected = match KIND {
180            RETRACT => StreamChunk::from_pretty(
181                " I I I
182                U- 1 1 1
183                U+ 1 1 2
184                + 4 9 2
185                + 2 5 5
186                - 6 6 9",
187            ),
188            UPSERT => StreamChunk::from_pretty(
189                " I I I
190                + 1 1 2
191                + 4 9 2
192                + 2 5 5
193                - 6 6 9",
194            ),
195        };
196        assert_eq!(chunk, expected, "{}", chunk.to_pretty());
197
198        let chunk = iter.next().unwrap().compact_vis();
199        assert_eq!(
200            chunk,
201            StreamChunk::from_pretty(
202                " I I I
203                + 2 2 2",
204            ),
205            "{}",
206            chunk.to_pretty()
207        );
208
209        assert_eq!(iter.next(), None);
210    }
211
212    #[test]
213    fn test_compact_chunk_reconstructed_upsert() {
214        test_compact_chunk_reconstructed::<UPSERT>();
215    }
216
217    #[test]
218    fn test_compact_chunk_reconstructed_retract() {
219        test_compact_chunk_reconstructed::<RETRACT>();
220    }
221
222    fn test_compact_chunk_reconstructed<const KIND: OutputKind>() {
223        let key = [0, 1];
224        let chunks = vec![
225            StreamChunk::from_pretty(
226                " I I I
227            - 1 1 1
228            + 1 1 2
229            + 2 5 7
230            + 4 9 2
231            - 2 5 7
232            + 2 5 5
233            - 6 6 9
234            + 6 6 9
235            - 9 9 1",
236            ),
237            StreamChunk::from_pretty(
238                " I I I
239            - 6 6 9
240            + 9 9 9
241            - 9 9 4
242            + 2 2 2
243            + 9 9 1",
244            ),
245        ];
246        let compactor = StreamChunkCompactor::new(key.to_vec(), chunks);
247
248        let chunks = compactor.into_compacted_chunks_reconstructed::<KIND>(
249            100,
250            vec![DataType::Int64, DataType::Int64, DataType::Int64],
251            InconsistencyBehavior::Panic,
252        );
253        let chunk = chunks.into_iter().next().unwrap();
254        let expected = match KIND {
255            RETRACT => StreamChunk::from_pretty(
256                "  I I I
257                 U- 1 1 1
258                 U+ 1 1 2
259                 + 4 9 2
260                 + 2 5 5
261                 - 6 6 9
262                 + 2 2 2",
263            ),
264            UPSERT => StreamChunk::from_pretty(
265                "  I I I
266                 + 1 1 2
267                 + 4 9 2
268                 + 2 5 5
269                 - 6 6 9
270                 + 2 2 2",
271            ),
272        };
273        assert_eq!(chunk, expected, "{}", chunk.to_pretty());
274    }
275}