risingwave_stream/common/
compact_chunk.rs1use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::hash::BuildHasherDefault;
18use std::mem;
19use std::sync::LazyLock;
20
21use itertools::Itertools;
22use prehash::{Passthru, Prehashed, new_prehashed_map_with_capacity};
23use risingwave_common::array::stream_chunk::{OpRowMutRef, StreamChunkMut};
24use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder;
25use risingwave_common::array::stream_record::Record;
26use risingwave_common::array::{Op, RowRef, StreamChunk};
27use risingwave_common::log::LogSuppresser;
28use risingwave_common::row::{Project, RowExt};
29use risingwave_common::types::DataType;
30use risingwave_common::util::hash_util::Crc32FastBuilder;
31
32use crate::consistency::consistency_panic;
33
34#[derive(Clone, Copy, Debug, PartialEq, Eq)]
48pub enum InconsistencyBehavior {
49 Panic,
50 Warn,
51 Tolerate,
52}
53
54impl InconsistencyBehavior {
55 #[track_caller]
57 fn report(self, msg: &str) {
58 match self {
59 InconsistencyBehavior::Panic => consistency_panic!("{}", msg),
60 InconsistencyBehavior::Warn => {
61 static LOG_SUPPERSSER: LazyLock<LogSuppresser> =
62 LazyLock::new(LogSuppresser::default);
63
64 if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
65 tracing::warn!(suppressed_count, "{}", msg);
66 }
67 }
68 InconsistencyBehavior::Tolerate => {}
69 }
70 }
71}
72
73pub struct StreamChunkCompactor {
75 chunks: Vec<StreamChunk>,
76 key: Vec<usize>,
77}
78
79struct OpRowMutRefTuple<'a> {
80 before_prev: Option<OpRowMutRef<'a>>,
81 prev: OpRowMutRef<'a>,
82}
83
84impl<'a> OpRowMutRefTuple<'a> {
85 fn push(&mut self, mut curr: OpRowMutRef<'a>, ib: InconsistencyBehavior) -> bool {
87 debug_assert!(self.prev.vis());
88 match (self.prev.op(), curr.op()) {
89 (Op::Insert, Op::Insert) => {
90 ib.report("receive duplicated insert on the stream");
91 self.prev.set_vis(false);
95 self.prev = curr;
96 }
97 (Op::Delete, Op::Delete) => {
98 ib.report("receive duplicated delete on the stream");
99 self.prev.set_vis(false);
103 self.prev = curr;
104 }
105 (Op::Insert, Op::Delete) => {
106 self.prev.set_vis(false);
108 curr.set_vis(false);
109 self.prev = if let Some(prev) = self.before_prev.take() {
110 prev
111 } else {
112 return true;
113 }
114 }
115 (Op::Delete, Op::Insert) => {
116 debug_assert!(
119 self.before_prev.is_none(),
120 "should have been taken in the above match arm"
121 );
122 self.before_prev = Some(mem::replace(&mut self.prev, curr));
123 }
124 _ => unreachable!(),
127 };
128 false
129 }
130
131 fn as_update_op(&mut self) -> Option<(&mut OpRowMutRef<'a>, &mut OpRowMutRef<'a>)> {
132 self.before_prev.as_mut().map(|prev| {
133 debug_assert_eq!(prev.op(), Op::Delete);
134 debug_assert_eq!(self.prev.op(), Op::Insert);
135 (prev, &mut self.prev)
136 })
137 }
138}
139
140type OpRowMap<'a, 'b> =
141 HashMap<Prehashed<Project<'b, RowRef<'a>>>, OpRowMutRefTuple<'a>, BuildHasherDefault<Passthru>>;
142
143#[derive(Clone, Debug)]
144pub enum RowOp<'a> {
145 Insert(RowRef<'a>),
146 Delete(RowRef<'a>),
147 Update((RowRef<'a>, RowRef<'a>)),
149}
150
151pub struct RowOpMap<'a, 'b> {
152 map: HashMap<Prehashed<Project<'b, RowRef<'a>>>, RowOp<'a>, BuildHasherDefault<Passthru>>,
153 ib: InconsistencyBehavior,
154}
155
156impl<'a, 'b> RowOpMap<'a, 'b> {
157 fn with_capacity(estimate_size: usize, ib: InconsistencyBehavior) -> Self {
158 Self {
159 map: new_prehashed_map_with_capacity(estimate_size),
160 ib,
161 }
162 }
163
164 pub fn insert(&mut self, k: Prehashed<Project<'b, RowRef<'a>>>, v: RowRef<'a>) {
165 let entry = self.map.entry(k);
166 match entry {
167 Entry::Vacant(e) => {
168 e.insert(RowOp::Insert(v));
169 }
170 Entry::Occupied(mut e) => match e.get() {
171 RowOp::Delete(old_v) => {
172 e.insert(RowOp::Update((*old_v, v)));
173 }
174 RowOp::Insert(_) => {
175 self.ib
176 .report("double insert for the same pk, breaking the sink's pk constraint");
177 e.insert(RowOp::Insert(v));
178 }
179 RowOp::Update((old_v, _)) => {
180 self.ib
181 .report("double insert for the same pk, breaking the sink's pk constraint");
182 e.insert(RowOp::Update((*old_v, v)));
183 }
184 },
185 }
186 }
187
188 pub fn delete(&mut self, k: Prehashed<Project<'b, RowRef<'a>>>, v: RowRef<'a>) {
189 let entry = self.map.entry(k);
190 match entry {
191 Entry::Vacant(e) => {
192 e.insert(RowOp::Delete(v));
193 }
194 Entry::Occupied(mut e) => match e.get() {
195 RowOp::Insert(_) => {
196 e.remove();
197 }
198 RowOp::Update((prev, _)) => {
199 e.insert(RowOp::Delete(*prev));
200 }
201 RowOp::Delete(_) => {
202 self.ib.report("double delete for the same pk");
203 e.insert(RowOp::Delete(v));
204 }
205 },
206 }
207 }
208
209 pub fn into_chunks(self, chunk_size: usize, data_types: Vec<DataType>) -> Vec<StreamChunk> {
210 let mut ret = vec![];
211 let mut builder = StreamChunkBuilder::new(chunk_size, data_types);
212 for (_, row_op) in self.map {
213 match row_op {
214 RowOp::Insert(row) => {
215 if let Some(c) = builder.append_record(Record::Insert { new_row: row }) {
216 ret.push(c)
217 }
218 }
219 RowOp::Delete(row) => {
220 if let Some(c) = builder.append_record(Record::Delete { old_row: row }) {
221 ret.push(c)
222 }
223 }
224 RowOp::Update((old, new)) => {
225 if old == new {
226 continue;
227 }
228 if let Some(c) = builder.append_record(Record::Update {
229 old_row: old,
230 new_row: new,
231 }) {
232 ret.push(c)
233 }
234 }
235 }
236 }
237 if let Some(c) = builder.take() {
238 ret.push(c);
239 }
240 ret
241 }
242}
243
244impl StreamChunkCompactor {
245 pub fn new(key: Vec<usize>, chunks: Vec<StreamChunk>) -> Self {
246 Self { chunks, key }
247 }
248
249 pub fn into_inner(self) -> (Vec<StreamChunk>, Vec<usize>) {
250 (self.chunks, self.key)
251 }
252
253 pub fn into_compacted_chunks(
260 self,
261 ib: InconsistencyBehavior,
262 ) -> impl Iterator<Item = StreamChunk> {
263 let (chunks, key_indices) = self.into_inner();
264
265 let estimate_size = chunks.iter().map(|c| c.cardinality()).sum();
266 let mut chunks: Vec<(Vec<u64>, StreamChunkMut)> = chunks
267 .into_iter()
268 .map(|c| {
269 let hash_values = c
270 .data_chunk()
271 .get_hash_values(&key_indices, Crc32FastBuilder)
272 .into_iter()
273 .map(|hash| hash.value())
274 .collect_vec();
275 (hash_values, StreamChunkMut::from(c))
276 })
277 .collect_vec();
278
279 let mut op_row_map: OpRowMap<'_, '_> = new_prehashed_map_with_capacity(estimate_size);
280 for (hash_values, c) in &mut chunks {
281 for (row, mut op_row) in c.to_rows_mut() {
282 op_row.set_op(op_row.op().normalize_update());
283 let hash = hash_values[row.index()];
284 let key = row.project(&key_indices);
285 match op_row_map.entry(Prehashed::new(key, hash)) {
286 Entry::Vacant(v) => {
287 v.insert(OpRowMutRefTuple {
288 before_prev: None,
289 prev: op_row,
290 });
291 }
292 Entry::Occupied(mut o) => {
293 if o.get_mut().push(op_row, ib) {
294 o.remove_entry();
295 }
296 }
297 }
298 }
299 }
300 for tuple in op_row_map.values_mut() {
301 if let Some((prev, latest)) = tuple.as_update_op() {
302 if prev.row_ref() == latest.row_ref() {
303 prev.set_vis(false);
304 latest.set_vis(false);
305 } else if prev.same_chunk(latest) && prev.index() + 1 == latest.index() {
306 prev.set_op(Op::UpdateDelete);
308 latest.set_op(Op::UpdateInsert);
309 }
310 }
311 }
312 chunks.into_iter().map(|(_, c)| c.into())
313 }
314
315 pub fn reconstructed_compacted_chunks(
317 self,
318 chunk_size: usize,
319 data_types: Vec<DataType>,
320 ib: InconsistencyBehavior,
321 ) -> Vec<StreamChunk> {
322 let (chunks, key_indices) = self.into_inner();
323
324 let estimate_size = chunks.iter().map(|c| c.cardinality()).sum();
325 let chunks: Vec<(_, _, _)> = chunks
326 .into_iter()
327 .map(|c| {
328 let (c, ops) = c.into_parts();
329 let hash_values = c
330 .get_hash_values(&key_indices, Crc32FastBuilder)
331 .into_iter()
332 .map(|hash| hash.value())
333 .collect_vec();
334 (hash_values, ops, c)
335 })
336 .collect_vec();
337 let mut map = RowOpMap::with_capacity(estimate_size, ib);
338 for (hash_values, ops, c) in &chunks {
339 for row in c.rows() {
340 let hash = hash_values[row.index()];
341 let op = ops[row.index()];
342 let key = row.project(&key_indices);
343 let k = Prehashed::new(key, hash);
344 match op {
345 Op::Insert | Op::UpdateInsert => map.insert(k, row),
346 Op::Delete | Op::UpdateDelete => map.delete(k, row),
347 }
348 }
349 }
350 map.into_chunks(chunk_size, data_types)
351 }
352}
353
354pub fn merge_chunk_row(
355 stream_chunk: StreamChunk,
356 pk_indices: &[usize],
357 ib: InconsistencyBehavior,
358) -> StreamChunk {
359 let compactor = StreamChunkCompactor::new(pk_indices.to_vec(), vec![stream_chunk]);
360 compactor.into_compacted_chunks(ib).next().unwrap()
361}
362
363#[cfg(test)]
364mod tests {
365 use risingwave_common::test_prelude::StreamChunkTestExt;
366
367 use super::*;
368
369 #[test]
370 fn test_merge_chunk_row() {
371 let pk_indices = [0, 1];
372 let chunks = vec![
373 StreamChunk::from_pretty(
374 " I I I
375 - 1 1 1
376 + 1 1 2
377 + 2 5 7
378 + 4 9 2
379 - 2 5 7
380 + 2 5 5
381 - 6 6 9
382 + 6 6 9
383 - 9 9 1",
384 ),
385 StreamChunk::from_pretty(
386 " I I I
387 - 6 6 9
388 + 9 9 9
389 - 9 9 4
390 + 2 2 2
391 + 9 9 1",
392 ),
393 ];
394 let compactor = StreamChunkCompactor::new(pk_indices.to_vec(), chunks);
395 let mut iter = compactor.into_compacted_chunks(InconsistencyBehavior::Panic);
396 assert_eq!(
397 iter.next().unwrap().compact(),
398 StreamChunk::from_pretty(
399 " I I I
400 U- 1 1 1
401 U+ 1 1 2
402 + 4 9 2
403 + 2 5 5
404 - 6 6 9",
405 )
406 );
407 assert_eq!(
408 iter.next().unwrap().compact(),
409 StreamChunk::from_pretty(
410 " I I I
411 + 2 2 2",
412 )
413 );
414
415 assert_eq!(iter.next(), None);
416 }
417
418 #[test]
419 fn test_compact_chunk_row() {
420 let pk_indices = [0, 1];
421 let chunks = vec![
422 StreamChunk::from_pretty(
423 " I I I
424 - 1 1 1
425 + 1 1 2
426 + 2 5 7
427 + 4 9 2
428 - 2 5 7
429 + 2 5 5
430 - 6 6 9
431 + 6 6 9
432 - 9 9 1",
433 ),
434 StreamChunk::from_pretty(
435 " I I I
436 - 6 6 9
437 + 9 9 9
438 - 9 9 4
439 + 2 2 2
440 + 9 9 1",
441 ),
442 ];
443 let compactor = StreamChunkCompactor::new(pk_indices.to_vec(), chunks);
444
445 let chunks = compactor.reconstructed_compacted_chunks(
446 100,
447 vec![DataType::Int64, DataType::Int64, DataType::Int64],
448 InconsistencyBehavior::Panic,
449 );
450 assert_eq!(
451 chunks.into_iter().next().unwrap(),
452 StreamChunk::from_pretty(
453 " I I I
454 + 2 5 5
455 - 6 6 9
456 + 4 9 2
457 U- 1 1 1
458 U+ 1 1 2
459 + 2 2 2",
460 )
461 );
462 }
463}