risingwave_stream/common/
compact_chunk.rs1use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::hash::BuildHasherDefault;
18use std::mem;
19use std::sync::LazyLock;
20
21use itertools::Itertools;
22use prehash::{Passthru, Prehashed, new_prehashed_map_with_capacity};
23use risingwave_common::array::stream_chunk::{OpRowMutRef, StreamChunkMut};
24use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder;
25use risingwave_common::array::stream_record::Record;
26use risingwave_common::array::{Op, RowRef, StreamChunk};
27use risingwave_common::log::LogSuppresser;
28use risingwave_common::row::{Project, RowExt};
29use risingwave_common::types::DataType;
30use risingwave_common::util::hash_util::Crc32FastBuilder;
31
32use crate::consistency::consistency_panic;
33
34pub struct StreamChunkCompactor {
36 chunks: Vec<StreamChunk>,
37 key: Vec<usize>,
38}
39
40struct OpRowMutRefTuple<'a> {
41 before_prev: Option<OpRowMutRef<'a>>,
42 prev: OpRowMutRef<'a>,
43}
44
45impl<'a> OpRowMutRefTuple<'a> {
46 fn push(&mut self, mut curr: OpRowMutRef<'a>) -> bool {
48 debug_assert!(self.prev.vis());
49 match (self.prev.op(), curr.op()) {
50 (Op::Insert, Op::Insert) => {
51 consistency_panic!("receive duplicated insert on the stream");
52 self.prev.set_vis(false);
56 self.prev = curr;
57 }
58 (Op::Delete, Op::Delete) => {
59 consistency_panic!("receive duplicated delete on the stream");
60 self.prev.set_vis(false);
64 self.prev = curr;
65 }
66 (Op::Insert, Op::Delete) => {
67 self.prev.set_vis(false);
69 curr.set_vis(false);
70 self.prev = if let Some(prev) = self.before_prev.take() {
71 prev
72 } else {
73 return true;
74 }
75 }
76 (Op::Delete, Op::Insert) => {
77 debug_assert!(
80 self.before_prev.is_none(),
81 "should have been taken in the above match arm"
82 );
83 self.before_prev = Some(mem::replace(&mut self.prev, curr));
84 }
85 _ => unreachable!(),
88 };
89 false
90 }
91
92 fn as_update_op(&mut self) -> Option<(&mut OpRowMutRef<'a>, &mut OpRowMutRef<'a>)> {
93 self.before_prev.as_mut().map(|prev| {
94 debug_assert_eq!(prev.op(), Op::Delete);
95 debug_assert_eq!(self.prev.op(), Op::Insert);
96 (prev, &mut self.prev)
97 })
98 }
99}
100
101type OpRowMap<'a, 'b> =
102 HashMap<Prehashed<Project<'b, RowRef<'a>>>, OpRowMutRefTuple<'a>, BuildHasherDefault<Passthru>>;
103
104#[derive(Clone, Debug)]
105pub enum RowOp<'a> {
106 Insert(RowRef<'a>),
107 Delete(RowRef<'a>),
108 Update((RowRef<'a>, RowRef<'a>)),
110}
111static LOG_SUPPERSSER: LazyLock<LogSuppresser> = LazyLock::new(LogSuppresser::default);
112
113pub struct RowOpMap<'a, 'b> {
114 map: HashMap<Prehashed<Project<'b, RowRef<'a>>>, RowOp<'a>, BuildHasherDefault<Passthru>>,
115 warn_for_inconsistent_stream: bool,
116}
117
118impl<'a, 'b> RowOpMap<'a, 'b> {
119 fn with_capacity(estimate_size: usize, warn_for_inconsistent_stream: bool) -> Self {
120 Self {
121 map: new_prehashed_map_with_capacity(estimate_size),
122 warn_for_inconsistent_stream,
123 }
124 }
125
126 pub fn insert(&mut self, k: Prehashed<Project<'b, RowRef<'a>>>, v: RowRef<'a>) {
127 let entry = self.map.entry(k);
128 match entry {
129 Entry::Vacant(e) => {
130 e.insert(RowOp::Insert(v));
131 }
132 Entry::Occupied(mut e) => match e.get() {
133 RowOp::Delete(old_v) => {
134 e.insert(RowOp::Update((*old_v, v)));
135 }
136 RowOp::Insert(_) => {
137 if self.warn_for_inconsistent_stream {
138 if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
139 tracing::warn!(
140 suppressed_count,
141 "double insert for the same pk, breaking the sink's pk constraint"
142 );
143 }
144 }
145 e.insert(RowOp::Insert(v));
146 }
147 RowOp::Update((old_v, _)) => {
148 if self.warn_for_inconsistent_stream {
149 if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
150 tracing::warn!(
151 suppressed_count,
152 "double insert for the same pk, breaking the sink's pk constraint"
153 );
154 }
155 }
156 e.insert(RowOp::Update((*old_v, v)));
157 }
158 },
159 }
160 }
161
162 pub fn delete(&mut self, k: Prehashed<Project<'b, RowRef<'a>>>, v: RowRef<'a>) {
163 let entry = self.map.entry(k);
164 match entry {
165 Entry::Vacant(e) => {
166 e.insert(RowOp::Delete(v));
167 }
168 Entry::Occupied(mut e) => match e.get() {
169 RowOp::Insert(_) => {
170 e.remove();
171 }
172 RowOp::Update((prev, _)) => {
173 e.insert(RowOp::Delete(*prev));
174 }
175 RowOp::Delete(_) => {
176 if self.warn_for_inconsistent_stream {
177 if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
178 tracing::warn!(suppressed_count, "double delete for the same pk");
179 }
180 }
181 e.insert(RowOp::Delete(v));
182 }
183 },
184 }
185 }
186
187 pub fn into_chunks(self, chunk_size: usize, data_types: Vec<DataType>) -> Vec<StreamChunk> {
188 let mut ret = vec![];
189 let mut builder = StreamChunkBuilder::new(chunk_size, data_types);
190 for (_, row_op) in self.map {
191 match row_op {
192 RowOp::Insert(row) => {
193 if let Some(c) = builder.append_record(Record::Insert { new_row: row }) {
194 ret.push(c)
195 }
196 }
197 RowOp::Delete(row) => {
198 if let Some(c) = builder.append_record(Record::Delete { old_row: row }) {
199 ret.push(c)
200 }
201 }
202 RowOp::Update((old, new)) => {
203 if old == new {
204 continue;
205 }
206 if let Some(c) = builder.append_record(Record::Update {
207 old_row: old,
208 new_row: new,
209 }) {
210 ret.push(c)
211 }
212 }
213 }
214 }
215 if let Some(c) = builder.take() {
216 ret.push(c);
217 }
218 ret
219 }
220}
221
222impl StreamChunkCompactor {
223 pub fn new(key: Vec<usize>, chunks: Vec<StreamChunk>) -> Self {
224 Self { chunks, key }
225 }
226
227 pub fn into_inner(self) -> (Vec<StreamChunk>, Vec<usize>) {
228 (self.chunks, self.key)
229 }
230
231 pub fn into_compacted_chunks(self) -> impl Iterator<Item = StreamChunk> {
238 let (chunks, key_indices) = self.into_inner();
239
240 let estimate_size = chunks.iter().map(|c| c.cardinality()).sum();
241 let mut chunks: Vec<(Vec<u64>, StreamChunkMut)> = chunks
242 .into_iter()
243 .map(|c| {
244 let hash_values = c
245 .data_chunk()
246 .get_hash_values(&key_indices, Crc32FastBuilder)
247 .into_iter()
248 .map(|hash| hash.value())
249 .collect_vec();
250 (hash_values, StreamChunkMut::from(c))
251 })
252 .collect_vec();
253
254 let mut op_row_map: OpRowMap<'_, '_> = new_prehashed_map_with_capacity(estimate_size);
255 for (hash_values, c) in &mut chunks {
256 for (row, mut op_row) in c.to_rows_mut() {
257 op_row.set_op(op_row.op().normalize_update());
258 let hash = hash_values[row.index()];
259 let key = row.project(&key_indices);
260 match op_row_map.entry(Prehashed::new(key, hash)) {
261 Entry::Vacant(v) => {
262 v.insert(OpRowMutRefTuple {
263 before_prev: None,
264 prev: op_row,
265 });
266 }
267 Entry::Occupied(mut o) => {
268 if o.get_mut().push(op_row) {
269 o.remove_entry();
270 }
271 }
272 }
273 }
274 }
275 for tuple in op_row_map.values_mut() {
276 if let Some((prev, latest)) = tuple.as_update_op() {
277 if prev.row_ref() == latest.row_ref() {
278 prev.set_vis(false);
279 latest.set_vis(false);
280 } else if prev.same_chunk(latest) && prev.index() + 1 == latest.index() {
281 prev.set_op(Op::UpdateDelete);
283 latest.set_op(Op::UpdateInsert);
284 }
285 }
286 }
287 chunks.into_iter().map(|(_, c)| c.into())
288 }
289
290 pub fn reconstructed_compacted_chunks(
292 self,
293 chunk_size: usize,
294 data_types: Vec<DataType>,
295 warn_for_inconsistent_stream: bool,
296 ) -> Vec<StreamChunk> {
297 let (chunks, key_indices) = self.into_inner();
298
299 let estimate_size = chunks.iter().map(|c| c.cardinality()).sum();
300 let chunks: Vec<(_, _, _)> = chunks
301 .into_iter()
302 .map(|c| {
303 let (c, ops) = c.into_parts();
304 let hash_values = c
305 .get_hash_values(&key_indices, Crc32FastBuilder)
306 .into_iter()
307 .map(|hash| hash.value())
308 .collect_vec();
309 (hash_values, ops, c)
310 })
311 .collect_vec();
312 let mut map = RowOpMap::with_capacity(estimate_size, warn_for_inconsistent_stream);
313 for (hash_values, ops, c) in &chunks {
314 for row in c.rows() {
315 let hash = hash_values[row.index()];
316 let op = ops[row.index()];
317 let key = row.project(&key_indices);
318 let k = Prehashed::new(key, hash);
319 match op {
320 Op::Insert | Op::UpdateInsert => map.insert(k, row),
321 Op::Delete | Op::UpdateDelete => map.delete(k, row),
322 }
323 }
324 }
325 map.into_chunks(chunk_size, data_types)
326 }
327}
328
329pub fn merge_chunk_row(stream_chunk: StreamChunk, pk_indices: &[usize]) -> StreamChunk {
330 let compactor = StreamChunkCompactor::new(pk_indices.to_vec(), vec![stream_chunk]);
331 compactor.into_compacted_chunks().next().unwrap()
332}
333
334#[cfg(test)]
335mod tests {
336 use risingwave_common::test_prelude::StreamChunkTestExt;
337
338 use super::*;
339
340 #[test]
341 fn test_merge_chunk_row() {
342 let pk_indices = [0, 1];
343 let chunks = vec![
344 StreamChunk::from_pretty(
345 " I I I
346 - 1 1 1
347 + 1 1 2
348 + 2 5 7
349 + 4 9 2
350 - 2 5 7
351 + 2 5 5
352 - 6 6 9
353 + 6 6 9
354 - 9 9 1",
355 ),
356 StreamChunk::from_pretty(
357 " I I I
358 - 6 6 9
359 + 9 9 9
360 - 9 9 4
361 + 2 2 2
362 + 9 9 1",
363 ),
364 ];
365 let compactor = StreamChunkCompactor::new(pk_indices.to_vec(), chunks);
366 let mut iter = compactor.into_compacted_chunks();
367 assert_eq!(
368 iter.next().unwrap().compact(),
369 StreamChunk::from_pretty(
370 " I I I
371 U- 1 1 1
372 U+ 1 1 2
373 + 4 9 2
374 + 2 5 5
375 - 6 6 9",
376 )
377 );
378 assert_eq!(
379 iter.next().unwrap().compact(),
380 StreamChunk::from_pretty(
381 " I I I
382 + 2 2 2",
383 )
384 );
385
386 assert_eq!(iter.next(), None);
387 }
388
389 #[test]
390 fn test_compact_chunk_row() {
391 let pk_indices = [0, 1];
392 let chunks = vec![
393 StreamChunk::from_pretty(
394 " I I I
395 - 1 1 1
396 + 1 1 2
397 + 2 5 7
398 + 4 9 2
399 - 2 5 7
400 + 2 5 5
401 - 6 6 9
402 + 6 6 9
403 - 9 9 1",
404 ),
405 StreamChunk::from_pretty(
406 " I I I
407 - 6 6 9
408 + 9 9 9
409 - 9 9 4
410 + 2 2 2
411 + 9 9 1",
412 ),
413 ];
414 let compactor = StreamChunkCompactor::new(pk_indices.to_vec(), chunks);
415
416 let chunks = compactor.reconstructed_compacted_chunks(
417 100,
418 vec![DataType::Int64, DataType::Int64, DataType::Int64],
419 true,
420 );
421 assert_eq!(
422 chunks.into_iter().next().unwrap(),
423 StreamChunk::from_pretty(
424 " I I I
425 + 2 5 5
426 - 6 6 9
427 + 4 9 2
428 U- 1 1 1
429 U+ 1 1 2
430 + 2 2 2",
431 )
432 );
433 }
434}