risingwave_stream/common/
compact_chunk.rs1use itertools::Itertools;
16use risingwave_common::array::stream_chunk::StreamChunkMut;
17use risingwave_common::array::stream_record::Record;
18use risingwave_common::array::{Op, StreamChunk};
19use risingwave_common::row::RowExt;
20use risingwave_common::types::DataType;
21
22pub use super::change_buffer::InconsistencyBehavior;
23use crate::common::change_buffer::output_kind::{RETRACT, UPSERT};
24use crate::common::change_buffer::{ChangeBuffer, OutputKind};
25
26pub struct StreamChunkCompactor {
28 chunks: Vec<StreamChunk>,
29 key: Vec<usize>,
30}
31
32impl StreamChunkCompactor {
33 pub fn new(key: Vec<usize>, chunks: Vec<StreamChunk>) -> Self {
34 Self { chunks, key }
35 }
36
37 pub fn into_inner(self) -> (Vec<StreamChunk>, Vec<usize>) {
38 (self.chunks, self.key)
39 }
40
41 pub fn into_compacted_chunks_inline<const KIND: OutputKind>(
43 self,
44 ib: InconsistencyBehavior,
45 ) -> Vec<StreamChunk> {
46 let (chunks, key_indices) = self.into_inner();
47
48 let estimate_size = chunks.iter().map(|c| c.cardinality()).sum();
49 let mut cb = ChangeBuffer::with_capacity(estimate_size).with_inconsistency_behavior(ib);
50
51 let mut chunks = chunks.into_iter().map(StreamChunkMut::from).collect_vec();
52 for chunk in &mut chunks {
53 for (row, mut op_row) in chunk.to_rows_mut() {
54 let op = op_row.op().normalize_update();
55 let key = row.project(&key_indices);
56 op_row.set_vis(false);
58 op_row.set_op(op);
59 cb.apply_op_row(op, key, op_row);
60 }
61 }
62
63 for record in cb.into_records() {
65 match record {
66 Record::Insert { mut new_row } => new_row.set_vis(true),
67 Record::Delete { mut old_row } => old_row.set_vis(true),
68 Record::Update {
69 mut old_row,
70 mut new_row,
71 } => {
72 match KIND {
73 UPSERT => new_row.set_vis(true),
75 RETRACT => {
78 old_row.set_vis(true);
79 new_row.set_vis(true);
80 if old_row.same_chunk(&new_row)
81 && old_row.index() + 1 == new_row.index()
82 {
83 old_row.set_op(Op::UpdateDelete);
84 new_row.set_op(Op::UpdateInsert);
85 }
86 }
87 }
88 }
89 }
90 }
91
92 chunks.into_iter().map(|c| c.into()).collect()
93 }
94
95 pub fn into_compacted_chunks_reconstructed<const KIND: OutputKind>(
98 self,
99 chunk_size: usize,
100 data_types: Vec<DataType>,
101 ib: InconsistencyBehavior,
102 ) -> Vec<StreamChunk> {
103 let (chunks, key_indices) = self.into_inner();
104
105 let estimate_size = chunks.iter().map(|c| c.cardinality()).sum();
106 let mut cb = ChangeBuffer::with_capacity(estimate_size).with_inconsistency_behavior(ib);
107
108 for chunk in &chunks {
109 for record in chunk.records() {
110 cb.apply_record(record, |&row| row.project(&key_indices));
111 }
112 }
113
114 cb.into_chunks::<KIND>(data_types, chunk_size)
115 }
116}
117
118pub fn compact_chunk_inline<const KIND: OutputKind>(
122 stream_chunk: StreamChunk,
123 key_indices: &[usize],
124 ib: InconsistencyBehavior,
125) -> StreamChunk {
126 StreamChunkCompactor::new(key_indices.to_vec(), vec![stream_chunk])
127 .into_compacted_chunks_inline::<KIND>(ib)
128 .into_iter()
129 .exactly_one()
130 .unwrap_or_else(|_| unreachable!("should have exactly one chunk in the output"))
131}
132
133#[cfg(test)]
134mod tests {
135 use risingwave_common::test_prelude::StreamChunkTestExt;
136
137 use super::*;
138
139 #[test]
140 fn test_compact_chunk_inline_upsert() {
141 test_compact_chunk_inline::<UPSERT>();
142 }
143
144 #[test]
145 fn test_compact_chunk_inline_retract() {
146 test_compact_chunk_inline::<RETRACT>();
147 }
148
149 fn test_compact_chunk_inline<const KIND: OutputKind>() {
150 let key = [0, 1];
151 let chunks = vec![
152 StreamChunk::from_pretty(
153 " I I I
154 - 1 1 1
155 + 1 1 2
156 + 2 5 7
157 + 4 9 2
158 - 2 5 7
159 + 2 5 5
160 - 6 6 9
161 + 6 6 9
162 - 9 9 1",
163 ),
164 StreamChunk::from_pretty(
165 " I I I
166 - 6 6 9
167 + 9 9 9
168 - 9 9 4
169 + 2 2 2
170 + 9 9 1",
171 ),
172 ];
173 let compactor = StreamChunkCompactor::new(key.to_vec(), chunks);
174 let mut iter = compactor
175 .into_compacted_chunks_inline::<KIND>(InconsistencyBehavior::Panic)
176 .into_iter();
177
178 let chunk = iter.next().unwrap().compact_vis();
179 let expected = match KIND {
180 RETRACT => StreamChunk::from_pretty(
181 " I I I
182 U- 1 1 1
183 U+ 1 1 2
184 + 4 9 2
185 + 2 5 5
186 - 6 6 9",
187 ),
188 UPSERT => StreamChunk::from_pretty(
189 " I I I
190 + 1 1 2
191 + 4 9 2
192 + 2 5 5
193 - 6 6 9",
194 ),
195 };
196 assert_eq!(chunk, expected, "{}", chunk.to_pretty());
197
198 let chunk = iter.next().unwrap().compact_vis();
199 assert_eq!(
200 chunk,
201 StreamChunk::from_pretty(
202 " I I I
203 + 2 2 2",
204 ),
205 "{}",
206 chunk.to_pretty()
207 );
208
209 assert_eq!(iter.next(), None);
210 }
211
212 #[test]
213 fn test_compact_chunk_reconstructed_upsert() {
214 test_compact_chunk_reconstructed::<UPSERT>();
215 }
216
217 #[test]
218 fn test_compact_chunk_reconstructed_retract() {
219 test_compact_chunk_reconstructed::<RETRACT>();
220 }
221
222 fn test_compact_chunk_reconstructed<const KIND: OutputKind>() {
223 let key = [0, 1];
224 let chunks = vec![
225 StreamChunk::from_pretty(
226 " I I I
227 - 1 1 1
228 + 1 1 2
229 + 2 5 7
230 + 4 9 2
231 - 2 5 7
232 + 2 5 5
233 - 6 6 9
234 + 6 6 9
235 - 9 9 1",
236 ),
237 StreamChunk::from_pretty(
238 " I I I
239 - 6 6 9
240 + 9 9 9
241 - 9 9 4
242 + 2 2 2
243 + 9 9 1",
244 ),
245 ];
246 let compactor = StreamChunkCompactor::new(key.to_vec(), chunks);
247
248 let chunks = compactor.into_compacted_chunks_reconstructed::<KIND>(
249 100,
250 vec![DataType::Int64, DataType::Int64, DataType::Int64],
251 InconsistencyBehavior::Panic,
252 );
253 let chunk = chunks.into_iter().next().unwrap();
254 let expected = match KIND {
255 RETRACT => StreamChunk::from_pretty(
256 " I I I
257 U- 1 1 1
258 U+ 1 1 2
259 + 4 9 2
260 + 2 5 5
261 - 6 6 9
262 + 2 2 2",
263 ),
264 UPSERT => StreamChunk::from_pretty(
265 " I I I
266 + 1 1 2
267 + 4 9 2
268 + 2 5 5
269 - 6 6 9
270 + 2 2 2",
271 ),
272 };
273 assert_eq!(chunk, expected, "{}", chunk.to_pretty());
274 }
275}