1use std::fmt::Display;
16use std::mem::size_of;
17use std::ops::{Deref, DerefMut};
18use std::sync::Arc;
19use std::{fmt, mem};
20
21use either::Either;
22use enum_as_inner::EnumAsInner;
23use itertools::Itertools;
24use rand::prelude::SmallRng;
25use rand::{Rng, SeedableRng};
26use risingwave_common_estimate_size::EstimateSize;
27use risingwave_pb::data::{PbOp, PbStreamChunk};
28
29use super::stream_chunk_builder::StreamChunkBuilder;
30use super::{ArrayImpl, ArrayRef, ArrayResult, DataChunkTestExt, RowRef};
31use crate::array::DataChunk;
32use crate::bitmap::{Bitmap, BitmapBuilder};
33use crate::catalog::Schema;
34use crate::field_generator::VarcharProperty;
35use crate::row::Row;
36use crate::types::{DataType, DefaultOrdered, ToText};
37
38#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash, EnumAsInner)]
45pub enum Op {
46 Insert,
47 Delete,
48 UpdateDelete,
49 UpdateInsert,
50}
51
52impl Op {
53 pub fn to_protobuf(self) -> PbOp {
54 match self {
55 Op::Insert => PbOp::Insert,
56 Op::Delete => PbOp::Delete,
57 Op::UpdateInsert => PbOp::UpdateInsert,
58 Op::UpdateDelete => PbOp::UpdateDelete,
59 }
60 }
61
62 pub fn from_protobuf(prost: &i32) -> ArrayResult<Op> {
63 let op = match PbOp::try_from(*prost) {
64 Ok(PbOp::Insert) => Op::Insert,
65 Ok(PbOp::Delete) => Op::Delete,
66 Ok(PbOp::UpdateInsert) => Op::UpdateInsert,
67 Ok(PbOp::UpdateDelete) => Op::UpdateDelete,
68 Ok(PbOp::Unspecified) => unreachable!(),
69 Err(_) => bail!("No such op type"),
70 };
71 Ok(op)
72 }
73
74 pub fn normalize_update(self) -> Op {
76 match self {
77 Op::Insert => Op::Insert,
78 Op::Delete => Op::Delete,
79 Op::UpdateDelete => Op::Delete,
80 Op::UpdateInsert => Op::Insert,
81 }
82 }
83
84 pub fn to_i16(self) -> i16 {
85 match self {
86 Op::Insert => 1,
87 Op::Delete => 2,
88 Op::UpdateInsert => 3,
89 Op::UpdateDelete => 4,
90 }
91 }
92
93 pub fn to_varchar(self) -> String {
94 match self {
95 Op::Insert => "Insert",
96 Op::Delete => "Delete",
97 Op::UpdateInsert => "UpdateInsert",
98 Op::UpdateDelete => "UpdateDelete",
99 }
100 .to_owned()
101 }
102}
103
104#[derive(Clone, PartialEq)]
106pub struct StreamChunk {
107 ops: Arc<[Op]>,
109 data: DataChunk,
110}
111
112impl Default for StreamChunk {
113 fn default() -> Self {
117 Self {
118 ops: Arc::new([]),
119 data: DataChunk::new(vec![], 0),
120 }
121 }
122}
123
124impl StreamChunk {
125 pub fn new(ops: impl Into<Arc<[Op]>>, columns: Vec<ArrayRef>) -> Self {
127 let ops = ops.into();
128 let visibility = Bitmap::ones(ops.len());
129 Self::with_visibility(ops, columns, visibility)
130 }
131
132 pub fn with_visibility(
134 ops: impl Into<Arc<[Op]>>,
135 columns: Vec<ArrayRef>,
136 visibility: Bitmap,
137 ) -> Self {
138 let ops = ops.into();
139 for col in &columns {
140 assert_eq!(col.len(), ops.len());
141 }
142 let data = DataChunk::new(columns, visibility);
143 StreamChunk { ops, data }
144 }
145
146 pub fn from_rows(rows: &[(Op, impl Row)], data_types: &[DataType]) -> Self {
153 let mut builder = StreamChunkBuilder::unlimited(data_types.to_vec(), Some(rows.len()));
154
155 for (op, row) in rows {
156 let none = builder.append_row(*op, row);
157 debug_assert!(none.is_none());
158 }
159
160 builder.take().expect("chunk should not be empty")
161 }
162
163 pub fn data_chunk(&self) -> &DataChunk {
165 &self.data
166 }
167
168 pub fn compact(self) -> Self {
170 if self.is_compacted() {
171 return self;
172 }
173
174 let (ops, columns, visibility) = self.into_inner();
175
176 let cardinality = visibility
177 .iter()
178 .fold(0, |vis_cnt, vis| vis_cnt + vis as usize);
179 let columns: Vec<_> = columns
180 .into_iter()
181 .map(|col| col.compact(&visibility, cardinality).into())
182 .collect();
183 let mut new_ops = Vec::with_capacity(cardinality);
184 for idx in visibility.iter_ones() {
185 new_ops.push(ops[idx]);
186 }
187 StreamChunk::new(new_ops, columns)
188 }
189
190 pub fn split(&self, size: usize) -> Vec<Self> {
198 let mut builder = StreamChunkBuilder::new(size, self.data_types());
199 let mut outputs = Vec::new();
200
201 for (op, row) in self.rows() {
203 if let Some(chunk) = builder.append_row(op, row) {
204 outputs.push(chunk);
205 }
206 }
207 if let Some(output) = builder.take() {
208 outputs.push(output);
209 }
210
211 outputs
212 }
213
214 pub fn into_parts(self) -> (DataChunk, Arc<[Op]>) {
215 (self.data, self.ops)
216 }
217
218 pub fn from_parts(ops: impl Into<Arc<[Op]>>, data_chunk: DataChunk) -> Self {
219 let (columns, vis) = data_chunk.into_parts();
220 Self::with_visibility(ops, columns, vis)
221 }
222
223 pub fn into_inner(self) -> (Arc<[Op]>, Vec<ArrayRef>, Bitmap) {
224 let (columns, vis) = self.data.into_parts();
225 (self.ops, columns, vis)
226 }
227
228 pub fn to_protobuf(&self) -> PbStreamChunk {
229 if !self.is_compacted() {
230 return self.clone().compact().to_protobuf();
231 }
232 PbStreamChunk {
233 cardinality: self.cardinality() as u32,
234 ops: self.ops.iter().map(|op| op.to_protobuf() as i32).collect(),
235 columns: self.columns().iter().map(|col| col.to_protobuf()).collect(),
236 }
237 }
238
239 pub fn from_protobuf(prost: &PbStreamChunk) -> ArrayResult<Self> {
240 let cardinality = prost.get_cardinality() as usize;
241 let mut ops = Vec::with_capacity(cardinality);
242 for op in prost.get_ops() {
243 ops.push(Op::from_protobuf(op)?);
244 }
245 let mut columns = vec![];
246 for column in prost.get_columns() {
247 columns.push(ArrayImpl::from_protobuf(column, cardinality)?.into());
248 }
249 Ok(StreamChunk::new(ops, columns))
250 }
251
252 pub fn ops(&self) -> &[Op] {
253 &self.ops
254 }
255
256 pub fn to_pretty(&self) -> impl Display + use<> {
258 self.to_pretty_inner(None)
259 }
260
261 pub fn to_pretty_with_schema(&self, schema: &Schema) -> impl Display + use<> {
264 self.to_pretty_inner(Some(schema))
265 }
266
267 fn to_pretty_inner(&self, schema: Option<&Schema>) -> impl Display + use<> {
268 use comfy_table::{Cell, CellAlignment, Table};
269
270 if self.cardinality() == 0 {
271 return Either::Left("(empty)");
272 }
273
274 let mut table = Table::new();
275 table.load_preset(DataChunk::PRETTY_TABLE_PRESET);
276
277 if let Some(schema) = schema {
278 assert_eq!(self.dimension(), schema.len());
279 let cells = std::iter::once(String::new())
280 .chain(schema.fields().iter().map(|f| f.name.clone()));
281 table.set_header(cells);
282 }
283
284 for (op, row_ref) in self.rows() {
285 let mut cells = Vec::with_capacity(row_ref.len() + 1);
286 cells.push(
287 Cell::new(match op {
288 Op::Insert => "+",
289 Op::Delete => "-",
290 Op::UpdateDelete => "U-",
291 Op::UpdateInsert => "U+",
292 })
293 .set_alignment(CellAlignment::Right),
294 );
295 for datum in row_ref.iter() {
296 let str = match datum {
297 None => "".to_owned(), Some(scalar) => scalar.to_text(),
299 };
300 cells.push(Cell::new(str));
301 }
302 table.add_row(cells);
303 }
304
305 Either::Right(table)
306 }
307
308 pub fn project(&self, indices: &[usize]) -> Self {
314 Self {
315 ops: self.ops.clone(),
316 data: self.data.project(indices),
317 }
318 }
319
320 pub fn eliminate_adjacent_noop_update(self) -> Self {
322 let len = self.data_chunk().capacity();
323 let mut c: StreamChunkMut = self.into();
324 let mut prev_r = None;
325 for curr in 0..len {
326 if !c.vis(curr) {
327 continue;
328 }
329 if let Some(prev) = prev_r {
330 if matches!(c.op(prev), Op::UpdateDelete | Op::Delete)
331 && matches!(c.op(curr), Op::UpdateInsert | Op::Insert)
332 && c.row_ref(prev) == c.row_ref(curr)
333 {
334 c.set_vis(prev, false);
335 c.set_vis(curr, false);
336 prev_r = None;
337 } else {
338 prev_r = Some(curr)
339 }
340 } else {
341 prev_r = Some(curr);
342 }
343 }
344 c.into()
345 }
346
347 pub fn project_with_vis(&self, indices: &[usize], vis: Bitmap) -> Self {
349 Self {
350 ops: self.ops.clone(),
351 data: self.data.project_with_vis(indices, vis),
352 }
353 }
354
355 pub fn clone_with_vis(&self, vis: Bitmap) -> Self {
357 Self {
358 ops: self.ops.clone(),
359 data: self.data.with_visibility(vis),
360 }
361 }
362
363 pub fn compute_rate_limit_chunk_permits(&self) -> u64 {
365 self.capacity() as _
366 }
367}
368
369impl Deref for StreamChunk {
370 type Target = DataChunk;
371
372 fn deref(&self) -> &Self::Target {
373 &self.data
374 }
375}
376
377impl DerefMut for StreamChunk {
378 fn deref_mut(&mut self) -> &mut Self::Target {
379 &mut self.data
380 }
381}
382
383impl From<DataChunk> for StreamChunk {
385 fn from(data: DataChunk) -> Self {
386 Self::from_parts(vec![Op::Insert; data.capacity()], data)
387 }
388}
389
390impl fmt::Debug for StreamChunk {
391 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
392 if f.alternate() {
393 write!(
394 f,
395 "StreamChunk {{ cardinality: {}, capacity: {}, data:\n{}\n }}",
396 self.cardinality(),
397 self.capacity(),
398 self.to_pretty()
399 )
400 } else {
401 f.debug_struct("StreamChunk")
402 .field("cardinality", &self.cardinality())
403 .field("capacity", &self.capacity())
404 .finish_non_exhaustive()
405 }
406 }
407}
408
409impl EstimateSize for StreamChunk {
410 fn estimated_heap_size(&self) -> usize {
411 self.data.estimated_heap_size() + self.ops.len() * size_of::<Op>()
412 }
413}
414
415enum OpsMutState {
416 ArcRef(Arc<[Op]>),
417 Mut(Vec<Op>),
418}
419
420impl OpsMutState {
421 const UNDEFINED: Self = Self::Mut(Vec::new());
422}
423
424pub struct OpsMut {
425 state: OpsMutState,
426}
427
428impl OpsMut {
429 pub fn new(ops: Arc<[Op]>) -> Self {
430 Self {
431 state: OpsMutState::ArcRef(ops),
432 }
433 }
434
435 pub fn len(&self) -> usize {
436 match &self.state {
437 OpsMutState::ArcRef(v) => v.len(),
438 OpsMutState::Mut(v) => v.len(),
439 }
440 }
441
442 pub fn is_empty(&self) -> bool {
443 self.len() == 0
444 }
445
446 pub fn set(&mut self, n: usize, val: Op) {
447 debug_assert!(n < self.len());
448 if let OpsMutState::Mut(v) = &mut self.state {
449 v[n] = val;
450 } else {
451 let state = mem::replace(&mut self.state, OpsMutState::UNDEFINED); let mut v = match state {
453 OpsMutState::ArcRef(v) => v.to_vec(),
454 OpsMutState::Mut(_) => unreachable!(),
455 };
456 v[n] = val;
457 self.state = OpsMutState::Mut(v);
458 }
459 }
460
461 pub fn get(&self, n: usize) -> Op {
462 debug_assert!(n < self.len());
463 match &self.state {
464 OpsMutState::ArcRef(v) => v[n],
465 OpsMutState::Mut(v) => v[n],
466 }
467 }
468}
469impl From<OpsMut> for Arc<[Op]> {
470 fn from(v: OpsMut) -> Self {
471 match v.state {
472 OpsMutState::ArcRef(a) => a,
473 OpsMutState::Mut(v) => v.into(),
474 }
475 }
476}
477
478pub struct StreamChunkMut {
481 columns: Arc<[ArrayRef]>,
482 ops: OpsMut,
483 vis: BitmapBuilder,
484}
485
486impl From<StreamChunk> for StreamChunkMut {
487 fn from(c: StreamChunk) -> Self {
488 let (c, ops) = c.into_parts();
489 let (columns, vis) = c.into_parts_v2();
490 Self {
491 columns,
492 ops: OpsMut::new(ops),
493 vis: vis.into(),
494 }
495 }
496}
497
498impl From<StreamChunkMut> for StreamChunk {
499 fn from(c: StreamChunkMut) -> Self {
500 StreamChunk::from_parts(c.ops, DataChunk::from_parts(c.columns, c.vis.finish()))
501 }
502}
503
504pub struct OpRowMutRef<'a> {
505 c: &'a mut StreamChunkMut,
506 i: usize,
507}
508
509impl OpRowMutRef<'_> {
510 pub fn index(&self) -> usize {
511 self.i
512 }
513
514 pub fn vis(&self) -> bool {
515 self.c.vis.is_set(self.i)
516 }
517
518 pub fn op(&self) -> Op {
519 self.c.ops.get(self.i)
520 }
521
522 pub fn set_vis(&mut self, val: bool) {
523 self.c.set_vis(self.i, val);
524 }
525
526 pub fn set_op(&mut self, val: Op) {
527 self.c.set_op(self.i, val);
528 }
529
530 pub fn row_ref(&self) -> RowRef<'_> {
531 RowRef::with_columns(self.c.columns(), self.i)
532 }
533
534 pub fn same_chunk(&self, other: &Self) -> bool {
536 std::ptr::eq(self.c, other.c)
537 }
538}
539
540impl StreamChunkMut {
541 pub fn capacity(&self) -> usize {
542 self.vis.len()
543 }
544
545 pub fn vis(&self, i: usize) -> bool {
546 self.vis.is_set(i)
547 }
548
549 pub fn op(&self, i: usize) -> Op {
550 self.ops.get(i)
551 }
552
553 pub fn row_ref(&self, i: usize) -> RowRef<'_> {
554 RowRef::with_columns(self.columns(), i)
555 }
556
557 pub fn set_vis(&mut self, n: usize, val: bool) {
558 self.vis.set(n, val);
559 }
560
561 pub fn set_op(&mut self, n: usize, val: Op) {
562 self.ops.set(n, val);
563 }
564
565 pub fn columns(&self) -> &[ArrayRef] {
566 &self.columns
567 }
568
569 pub fn to_rows_mut(&mut self) -> impl Iterator<Item = (RowRef<'_>, OpRowMutRef<'_>)> {
571 unsafe {
572 (0..self.vis.len())
573 .filter(|i| self.vis.is_set(*i))
574 .map(|i| {
575 let p = self as *const StreamChunkMut;
576 let p = p as *mut StreamChunkMut;
577 (
578 RowRef::with_columns(self.columns(), i),
579 OpRowMutRef { c: &mut *p, i },
580 )
581 })
582 }
583 }
584}
585
586#[easy_ext::ext(StreamChunkTestExt)]
588impl StreamChunk {
589 pub fn from_pretty(s: &str) -> Self {
630 let mut chunk_str = String::new();
631 let mut ops = vec![];
632
633 let (header, body) = match s.split_once('\n') {
634 Some(pair) => pair,
635 None => {
636 return StreamChunk {
638 ops: Arc::new([]),
639 data: DataChunk::from_pretty(s),
640 };
641 }
642 };
643 chunk_str.push_str(header);
644 chunk_str.push('\n');
645
646 for line in body.split_inclusive('\n') {
647 if line.trim_start().is_empty() {
648 continue;
649 }
650 let (op, row) = line
651 .trim_start()
652 .split_once(|c: char| c.is_ascii_whitespace())
653 .ok_or_else(|| panic!("missing operation: {line:?}"))
654 .unwrap();
655 ops.push(match op {
656 "+" => Op::Insert,
657 "-" => Op::Delete,
658 "U+" => Op::UpdateInsert,
659 "U-" => Op::UpdateDelete,
660 t => panic!("invalid op: {t:?}"),
661 });
662 chunk_str.push_str(row);
663 }
664 StreamChunk {
665 ops: ops.into(),
666 data: DataChunk::from_pretty(&chunk_str),
667 }
668 }
669
670 pub fn valid(&self) -> bool {
672 let len = self.ops.len();
673 let data = &self.data;
674 data.visibility().len() == len && data.columns().iter().all(|col| col.len() == len)
675 }
676
677 pub fn concat(chunks: Vec<StreamChunk>) -> StreamChunk {
681 let data_types = chunks[0].data_types();
682 let size = chunks.iter().map(|c| c.cardinality()).sum::<usize>();
683
684 let mut builder = StreamChunkBuilder::unlimited(data_types, Some(size));
685
686 for chunk in chunks {
687 for (op, row) in chunk.rows() {
689 let none = builder.append_row(op, row);
690 debug_assert!(none.is_none());
691 }
692 }
693
694 builder.take().expect("chunk should not be empty")
695 }
696
697 pub fn sort_rows(self) -> Self {
699 if self.capacity() == 0 {
700 return self;
701 }
702 let rows = self.rows().collect_vec();
703 let mut idx = (0..self.capacity()).collect_vec();
704 idx.sort_by_key(|&i| {
705 let (op, row_ref) = rows[i];
706 (op, DefaultOrdered(row_ref))
707 });
708 StreamChunk {
709 ops: idx.iter().map(|&i| self.ops[i]).collect(),
710 data: self.data.reorder_rows(&idx),
711 }
712 }
713
714 pub fn gen_stream_chunks(
718 num_of_chunks: usize,
719 chunk_size: usize,
720 data_types: &[DataType],
721 varchar_properties: &VarcharProperty,
722 ) -> Vec<StreamChunk> {
723 Self::gen_stream_chunks_inner(
724 num_of_chunks,
725 chunk_size,
726 data_types,
727 varchar_properties,
728 1.0,
729 1.0,
730 )
731 }
732
733 pub fn gen_stream_chunks_inner(
734 num_of_chunks: usize,
735 chunk_size: usize,
736 data_types: &[DataType],
737 varchar_properties: &VarcharProperty,
738 visibility_percent: f64, inserts_percent: f64, ) -> Vec<StreamChunk> {
741 let ops = if inserts_percent == 0.0 {
742 vec![Op::Delete; chunk_size]
743 } else if inserts_percent == 1.0 {
744 vec![Op::Insert; chunk_size]
745 } else {
746 let mut rng = SmallRng::from_seed([0; 32]);
747 let mut ops = vec![];
748 for _ in 0..chunk_size {
749 ops.push(if rng.random_bool(inserts_percent) {
750 Op::Insert
751 } else {
752 Op::Delete
753 });
754 }
755 ops
756 };
757 DataChunk::gen_data_chunks(
758 num_of_chunks,
759 chunk_size,
760 data_types,
761 varchar_properties,
762 visibility_percent,
763 )
764 .into_iter()
765 .map(|chunk| StreamChunk::from_parts(ops.clone(), chunk))
766 .collect()
767 }
768}
769
770#[cfg(test)]
771mod tests {
772 use super::*;
773
774 #[test]
775 fn test_to_pretty_string() {
776 let chunk = StreamChunk::from_pretty(
777 " I I
778 + 1 6
779 - 2 .
780 U- 3 7
781 U+ 4 .",
782 );
783 assert_eq!(
784 chunk.to_pretty().to_string(),
785 "\
786+----+---+---+
787| + | 1 | 6 |
788| - | 2 | |
789| U- | 3 | 7 |
790| U+ | 4 | |
791+----+---+---+"
792 );
793 }
794
795 #[test]
796 fn test_split_1() {
797 let chunk = StreamChunk::from_pretty(
798 " I I
799 + 1 6
800 - 2 .
801 U- 3 7
802 U+ 4 .",
803 );
804 let results = chunk.split(2);
805 assert_eq!(2, results.len());
806 assert_eq!(
807 results[0].to_pretty().to_string(),
808 "\
809+---+---+---+
810| + | 1 | 6 |
811| - | 2 | |
812+---+---+---+"
813 );
814 assert_eq!(
815 results[1].to_pretty().to_string(),
816 "\
817+----+---+---+
818| U- | 3 | 7 |
819| U+ | 4 | |
820+----+---+---+"
821 );
822 }
823
824 #[test]
825 fn test_split_2() {
826 let chunk = StreamChunk::from_pretty(
827 " I I
828 + 1 6
829 U- 3 7
830 U+ 4 .
831 - 2 .",
832 );
833 let results = chunk.split(2);
834 assert_eq!(2, results.len());
835 assert_eq!(
836 results[0].to_pretty().to_string(),
837 "\
838+----+---+---+
839| + | 1 | 6 |
840| U- | 3 | 7 |
841| U+ | 4 | |
842+----+---+---+"
843 );
844 assert_eq!(
845 results[1].to_pretty().to_string(),
846 "\
847+---+---+---+
848| - | 2 | |
849+---+---+---+"
850 );
851 }
852
853 #[test]
854 fn test_eliminate_adjacent_noop_update() {
855 let c = StreamChunk::from_pretty(
856 " I I
857 - 1 6 D
858 - 2 2
859 + 2 3
860 - 2 3
861 + 1 6
862 - 1 7
863 + 1 10 D
864 + 1 7
865 U- 3 7
866 U+ 3 7
867 + 2 3",
868 );
869 let c = c.eliminate_adjacent_noop_update();
870 assert_eq!(
871 c.to_pretty().to_string(),
872 "\
873+---+---+---+
874| - | 2 | 2 |
875| + | 2 | 3 |
876| - | 2 | 3 |
877| + | 1 | 6 |
878| + | 2 | 3 |
879+---+---+---+"
880 );
881 }
882}