1use std::fmt::Display;
16use std::mem::size_of;
17use std::ops::{Deref, DerefMut};
18use std::sync::{Arc, LazyLock};
19use std::{fmt, mem};
20
21use either::Either;
22use enum_as_inner::EnumAsInner;
23use itertools::Itertools;
24use rand::prelude::SmallRng;
25use rand::{Rng, SeedableRng};
26use risingwave_common_estimate_size::EstimateSize;
27use risingwave_pb::data::{PbOp, PbStreamChunk};
28
29use super::stream_chunk_builder::StreamChunkBuilder;
30use super::{ArrayImpl, ArrayRef, ArrayResult, DataChunkTestExt, RowRef};
31use crate::array::DataChunk;
32use crate::bitmap::{Bitmap, BitmapBuilder};
33use crate::catalog::Schema;
34use crate::field_generator::VarcharProperty;
35use crate::row::Row;
36use crate::types::{DataType, DefaultOrdered, ToText};
37
38#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash, EnumAsInner)]
45pub enum Op {
46 Insert,
47 Delete,
48 UpdateDelete,
49 UpdateInsert,
50}
51
52impl Op {
53 pub fn to_protobuf(self) -> PbOp {
54 match self {
55 Op::Insert => PbOp::Insert,
56 Op::Delete => PbOp::Delete,
57 Op::UpdateInsert => PbOp::UpdateInsert,
58 Op::UpdateDelete => PbOp::UpdateDelete,
59 }
60 }
61
62 pub fn from_protobuf(prost: &i32) -> ArrayResult<Op> {
63 let op = match PbOp::try_from(*prost) {
64 Ok(PbOp::Insert) => Op::Insert,
65 Ok(PbOp::Delete) => Op::Delete,
66 Ok(PbOp::UpdateInsert) => Op::UpdateInsert,
67 Ok(PbOp::UpdateDelete) => Op::UpdateDelete,
68 Ok(PbOp::Unspecified) => unreachable!(),
69 Err(_) => bail!("No such op type"),
70 };
71 Ok(op)
72 }
73
74 pub fn normalize_update(self) -> Op {
76 match self {
77 Op::Insert => Op::Insert,
78 Op::Delete => Op::Delete,
79 Op::UpdateDelete => Op::Delete,
80 Op::UpdateInsert => Op::Insert,
81 }
82 }
83
84 pub fn to_i16(self) -> i16 {
85 match self {
86 Op::Insert => 1,
87 Op::Delete => 2,
88 Op::UpdateInsert => 3,
89 Op::UpdateDelete => 4,
90 }
91 }
92
93 pub fn to_varchar(self) -> String {
94 match self {
95 Op::Insert => "Insert",
96 Op::Delete => "Delete",
97 Op::UpdateInsert => "UpdateInsert",
98 Op::UpdateDelete => "UpdateDelete",
99 }
100 .to_owned()
101 }
102}
103
104#[derive(Clone, PartialEq)]
106pub struct StreamChunk {
107 ops: Arc<[Op]>,
109 data: DataChunk,
110}
111
112impl Default for StreamChunk {
113 fn default() -> Self {
117 Self {
118 ops: Arc::new([]),
119 data: DataChunk::new(vec![], 0),
120 }
121 }
122}
123
124impl StreamChunk {
125 pub fn new(ops: impl Into<Arc<[Op]>>, columns: Vec<ArrayRef>) -> Self {
127 let ops = ops.into();
128 let visibility = Bitmap::ones(ops.len());
129 Self::with_visibility(ops, columns, visibility)
130 }
131
132 pub fn with_visibility(
134 ops: impl Into<Arc<[Op]>>,
135 columns: Vec<ArrayRef>,
136 visibility: Bitmap,
137 ) -> Self {
138 let ops = ops.into();
139 for col in &columns {
140 assert_eq!(col.len(), ops.len());
141 }
142 let data = DataChunk::new(columns, visibility);
143 StreamChunk { ops, data }
144 }
145
146 pub fn from_rows(rows: &[(Op, impl Row)], data_types: &[DataType]) -> Self {
153 let mut builder = StreamChunkBuilder::unlimited(data_types.to_vec(), Some(rows.len()));
154
155 for (op, row) in rows {
156 let none = builder.append_row(*op, row);
157 debug_assert!(none.is_none());
158 }
159
160 builder.take().expect("chunk should not be empty")
161 }
162
163 pub fn empty(data_types: &[DataType]) -> Self {
164 StreamChunkBuilder::build_empty(data_types.to_vec())
165 }
166
167 pub fn data_chunk(&self) -> &DataChunk {
169 &self.data
170 }
171
172 pub fn compact_vis(self) -> Self {
180 if self.is_vis_compacted() {
181 return self;
182 }
183
184 let (ops, columns, visibility) = self.into_inner();
185
186 let cardinality = visibility
187 .iter()
188 .fold(0, |vis_cnt, vis| vis_cnt + vis as usize);
189 let columns: Vec<_> = columns
190 .into_iter()
191 .map(|col| col.compact_vis(&visibility, cardinality).into())
192 .collect();
193 let mut new_ops = Vec::with_capacity(cardinality);
194 for idx in visibility.iter_ones() {
195 new_ops.push(ops[idx]);
196 }
197 StreamChunk::new(new_ops, columns)
198 }
199
200 pub fn split(&self, size: usize) -> Vec<Self> {
208 let mut builder = StreamChunkBuilder::new(size, self.data_types());
209 let mut outputs = Vec::new();
210
211 for (op, row) in self.rows() {
213 if let Some(chunk) = builder.append_row(op, row) {
214 outputs.push(chunk);
215 }
216 }
217 if let Some(output) = builder.take() {
218 outputs.push(output);
219 }
220
221 outputs
222 }
223
224 pub fn into_parts(self) -> (DataChunk, Arc<[Op]>) {
225 (self.data, self.ops)
226 }
227
228 pub fn from_parts(ops: impl Into<Arc<[Op]>>, data_chunk: DataChunk) -> Self {
229 let (columns, vis) = data_chunk.into_parts();
230 Self::with_visibility(ops, columns, vis)
231 }
232
233 pub fn into_inner(self) -> (Arc<[Op]>, Vec<ArrayRef>, Bitmap) {
234 let (columns, vis) = self.data.into_parts();
235 (self.ops, columns, vis)
236 }
237
238 pub fn to_protobuf(&self) -> PbStreamChunk {
239 if !self.is_vis_compacted() {
240 return self.clone().compact_vis().to_protobuf();
241 }
242 PbStreamChunk {
243 cardinality: self.cardinality() as u32,
244 ops: self.ops.iter().map(|op| op.to_protobuf() as i32).collect(),
245 columns: self.columns().iter().map(|col| col.to_protobuf()).collect(),
246 }
247 }
248
249 pub fn from_protobuf(prost: &PbStreamChunk) -> ArrayResult<Self> {
250 let cardinality = prost.get_cardinality() as usize;
251 let mut ops = Vec::with_capacity(cardinality);
252 for op in prost.get_ops() {
253 ops.push(Op::from_protobuf(op)?);
254 }
255 let mut columns = vec![];
256 for column in prost.get_columns() {
257 columns.push(ArrayImpl::from_protobuf(column, cardinality)?.into());
258 }
259 Ok(StreamChunk::new(ops, columns))
260 }
261
262 pub fn ops(&self) -> &[Op] {
263 &self.ops
264 }
265
266 pub fn to_pretty(&self) -> impl Display + use<> {
268 self.to_pretty_inner(None)
269 }
270
271 pub fn to_pretty_with_schema(&self, schema: &Schema) -> impl Display + use<> {
274 self.to_pretty_inner(Some(schema))
275 }
276
277 fn to_pretty_inner(&self, schema: Option<&Schema>) -> impl Display + use<> {
278 use comfy_table::{Cell, CellAlignment, Table};
279
280 if self.cardinality() == 0 {
281 return Either::Left("(empty)");
282 }
283
284 let mut table = Table::new();
285 table.load_preset(DataChunk::PRETTY_TABLE_PRESET);
286
287 if let Some(schema) = schema {
288 assert_eq!(self.dimension(), schema.len());
289 let cells = std::iter::once(String::new())
290 .chain(schema.fields().iter().map(|f| f.name.clone()));
291 table.set_header(cells);
292 }
293
294 for (op, row_ref) in self.rows() {
295 let mut cells = Vec::with_capacity(row_ref.len() + 1);
296 cells.push(
297 Cell::new(match op {
298 Op::Insert => "+",
299 Op::Delete => "-",
300 Op::UpdateDelete => "U-",
301 Op::UpdateInsert => "U+",
302 })
303 .set_alignment(CellAlignment::Right),
304 );
305 for datum in row_ref.iter() {
306 let str = match datum {
307 None => "".to_owned(), Some(scalar) => scalar.to_text(),
309 };
310 cells.push(Cell::new(str));
311 }
312 table.add_row(cells);
313 }
314
315 Either::Right(table)
316 }
317
318 pub fn project(&self, indices: &[usize]) -> Self {
324 Self {
325 ops: self.ops.clone(),
326 data: self.data.project(indices),
327 }
328 }
329
330 pub fn eliminate_adjacent_noop_update(self) -> Self {
332 let len = self.data_chunk().capacity();
333 let mut c: StreamChunkMut = self.into();
334 let mut prev_r = None;
335 for curr in 0..len {
336 if !c.vis(curr) {
337 continue;
338 }
339 if let Some(prev) = prev_r
340 && (
341 (matches!(c.op(prev), Op::UpdateDelete | Op::Delete)
343 && matches!(c.op(curr), Op::UpdateInsert | Op::Insert))
344 ||
345 (matches!(c.op(prev), Op::UpdateInsert | Op::Insert)
358 && matches!(c.op(curr), Op::UpdateDelete | Op::Delete))
359 )
360 && c.row_ref(prev) == c.row_ref(curr)
361 {
362 c.set_vis(prev, false);
363 c.set_vis(curr, false);
364 prev_r = None;
365 } else {
366 prev_r = Some(curr);
367 }
368 }
369
370 for idx in 0..len.saturating_sub(1) {
373 if c.op(idx) == Op::UpdateDelete && c.op(idx + 1) == Op::UpdateInsert {
374 let delete_vis = c.vis(idx);
375 let insert_vis = c.vis(idx + 1);
376 if delete_vis && !insert_vis {
377 c.set_op(idx, Op::Delete);
378 } else if !delete_vis && insert_vis {
379 c.set_op(idx + 1, Op::Insert);
380 }
381 }
382 }
383 c.into()
384 }
385
386 pub fn project_with_vis(&self, indices: &[usize], vis: Bitmap) -> Self {
388 Self {
389 ops: self.ops.clone(),
390 data: self.data.project_with_vis(indices, vis),
391 }
392 }
393
394 pub fn clone_with_vis(&self, vis: Bitmap) -> Self {
396 Self {
397 ops: self.ops.clone(),
398 data: self.data.with_visibility(vis),
399 }
400 }
401}
402
403impl Deref for StreamChunk {
404 type Target = DataChunk;
405
406 fn deref(&self) -> &Self::Target {
407 &self.data
408 }
409}
410
411impl DerefMut for StreamChunk {
412 fn deref_mut(&mut self) -> &mut Self::Target {
413 &mut self.data
414 }
415}
416
417impl From<DataChunk> for StreamChunk {
419 fn from(data: DataChunk) -> Self {
420 Self::from_parts(vec![Op::Insert; data.capacity()], data)
421 }
422}
423
424impl fmt::Debug for StreamChunk {
425 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
426 if f.alternate() {
427 write!(
428 f,
429 "StreamChunk {{ cardinality: {}, capacity: {}, data:\n{}\n }}",
430 self.cardinality(),
431 self.capacity(),
432 self.to_pretty()
433 )
434 } else {
435 f.debug_struct("StreamChunk")
436 .field("cardinality", &self.cardinality())
437 .field("capacity", &self.capacity())
438 .finish_non_exhaustive()
439 }
440 }
441}
442
443impl EstimateSize for StreamChunk {
444 fn estimated_heap_size(&self) -> usize {
445 self.data.estimated_heap_size() + self.ops.len() * size_of::<Op>()
446 }
447}
448
449enum OpsMutState {
450 ArcRef(Arc<[Op]>),
451 Mut(Vec<Op>),
452}
453
454impl OpsMutState {
455 const UNDEFINED: Self = Self::Mut(Vec::new());
456}
457
458pub struct OpsMut {
459 state: OpsMutState,
460}
461
462impl OpsMut {
463 pub fn new(ops: Arc<[Op]>) -> Self {
464 Self {
465 state: OpsMutState::ArcRef(ops),
466 }
467 }
468
469 pub fn len(&self) -> usize {
470 match &self.state {
471 OpsMutState::ArcRef(v) => v.len(),
472 OpsMutState::Mut(v) => v.len(),
473 }
474 }
475
476 pub fn is_empty(&self) -> bool {
477 self.len() == 0
478 }
479
480 pub fn set(&mut self, n: usize, val: Op) {
481 debug_assert!(n < self.len());
482 if let OpsMutState::Mut(v) = &mut self.state {
483 v[n] = val;
484 } else {
485 let state = mem::replace(&mut self.state, OpsMutState::UNDEFINED); let mut v = match state {
487 OpsMutState::ArcRef(v) => v.to_vec(),
488 OpsMutState::Mut(_) => unreachable!(),
489 };
490 v[n] = val;
491 self.state = OpsMutState::Mut(v);
492 }
493 }
494
495 pub fn get(&self, n: usize) -> Op {
496 debug_assert!(n < self.len());
497 match &self.state {
498 OpsMutState::ArcRef(v) => v[n],
499 OpsMutState::Mut(v) => v[n],
500 }
501 }
502}
503impl From<OpsMut> for Arc<[Op]> {
504 fn from(v: OpsMut) -> Self {
505 match v.state {
506 OpsMutState::ArcRef(a) => a,
507 OpsMutState::Mut(v) => v.into(),
508 }
509 }
510}
511
512pub struct StreamChunkMut {
515 columns: Arc<[ArrayRef]>,
516 ops: OpsMut,
517 vis: BitmapBuilder,
518}
519
520impl From<StreamChunk> for StreamChunkMut {
521 fn from(c: StreamChunk) -> Self {
522 let (c, ops) = c.into_parts();
523 let (columns, vis) = c.into_parts_v2();
524 Self {
525 columns,
526 ops: OpsMut::new(ops),
527 vis: vis.into(),
528 }
529 }
530}
531
532impl From<StreamChunkMut> for StreamChunk {
533 fn from(c: StreamChunkMut) -> Self {
534 StreamChunk::from_parts(c.ops, DataChunk::from_parts(c.columns, c.vis.finish()))
535 }
536}
537
538pub struct OpRowMutRef<'a> {
539 c: &'a mut StreamChunkMut,
540 i: usize,
541}
542
543impl Default for OpRowMutRef<'_> {
545 fn default() -> Self {
546 static mut DUMMY_CHUNK_MUT: LazyLock<StreamChunkMut> =
547 LazyLock::new(|| StreamChunk::default().into());
548
549 #[allow(clippy::deref_addrof)] OpRowMutRef {
551 c: unsafe { &mut *(&raw mut DUMMY_CHUNK_MUT) },
552 i: 0,
553 }
554 }
555}
556
557impl PartialEq for OpRowMutRef<'_> {
558 fn eq(&self, other: &Self) -> bool {
559 self.row_ref() == other.row_ref()
560 }
561}
562impl Eq for OpRowMutRef<'_> {}
563
564impl<'a> OpRowMutRef<'a> {
565 pub fn index(&self) -> usize {
566 self.i
567 }
568
569 pub fn vis(&self) -> bool {
570 self.c.vis.is_set(self.i)
571 }
572
573 pub fn op(&self) -> Op {
574 self.c.ops.get(self.i)
575 }
576
577 pub fn set_vis(&mut self, val: bool) {
578 self.c.set_vis(self.i, val);
579 }
580
581 pub fn set_op(&mut self, val: Op) {
582 self.c.set_op(self.i, val);
583 }
584
585 pub fn row_ref(&self) -> RowRef<'_> {
586 RowRef::with_columns(self.c.columns(), self.i)
587 }
588
589 pub fn same_chunk(&self, other: &Self) -> bool {
591 std::ptr::eq(self.c, other.c)
592 }
593}
594
595impl StreamChunkMut {
596 pub fn capacity(&self) -> usize {
597 self.vis.len()
598 }
599
600 pub fn vis(&self, i: usize) -> bool {
601 self.vis.is_set(i)
602 }
603
604 pub fn op(&self, i: usize) -> Op {
605 self.ops.get(i)
606 }
607
608 pub fn row_ref(&self, i: usize) -> RowRef<'_> {
609 RowRef::with_columns(self.columns(), i)
610 }
611
612 pub fn set_vis(&mut self, n: usize, val: bool) {
613 self.vis.set(n, val);
614 }
615
616 pub fn set_op(&mut self, n: usize, val: Op) {
617 self.ops.set(n, val);
618 }
619
620 pub fn columns(&self) -> &[ArrayRef] {
621 &self.columns
622 }
623
624 pub fn to_rows_mut(&mut self) -> impl Iterator<Item = (RowRef<'_>, OpRowMutRef<'_>)> {
626 unsafe {
629 (0..self.vis.len())
630 .filter(|i| self.vis.is_set(*i))
631 .map(|i| {
632 let p = self as *const StreamChunkMut;
633 let p = p as *mut StreamChunkMut;
634 (
635 RowRef::with_columns(self.columns(), i),
636 OpRowMutRef { c: &mut *p, i },
637 )
638 })
639 }
640 }
641}
642
643#[easy_ext::ext(StreamChunkTestExt)]
645impl StreamChunk {
646 pub fn from_pretty(s: &str) -> Self {
687 let mut chunk_str = String::new();
688 let mut ops = vec![];
689
690 let (header, body) = match s.split_once('\n') {
691 Some(pair) => pair,
692 None => {
693 return StreamChunk {
695 ops: Arc::new([]),
696 data: DataChunk::from_pretty(s),
697 };
698 }
699 };
700 chunk_str.push_str(header);
701 chunk_str.push('\n');
702
703 for line in body.split_inclusive('\n') {
704 if line.trim_start().is_empty() {
705 continue;
706 }
707 let (op, row) = line
708 .trim_start()
709 .split_once(|c: char| c.is_ascii_whitespace())
710 .ok_or_else(|| panic!("missing operation: {line:?}"))
711 .unwrap();
712 ops.push(match op {
713 "+" => Op::Insert,
714 "-" => Op::Delete,
715 "U+" => Op::UpdateInsert,
716 "U-" => Op::UpdateDelete,
717 t => panic!("invalid op: {t:?}"),
718 });
719 chunk_str.push_str(row);
720 }
721 StreamChunk {
722 ops: ops.into(),
723 data: DataChunk::from_pretty(&chunk_str),
724 }
725 }
726
727 pub fn valid(&self) -> bool {
729 let len = self.ops.len();
730 let data = &self.data;
731 data.visibility().len() == len && data.columns().iter().all(|col| col.len() == len)
732 }
733
734 pub fn concat(chunks: Vec<StreamChunk>) -> StreamChunk {
738 let data_types = chunks[0].data_types();
739 let size = chunks.iter().map(|c| c.cardinality()).sum::<usize>();
740
741 let mut builder = StreamChunkBuilder::unlimited(data_types, Some(size));
742
743 for chunk in chunks {
744 for (op, row) in chunk.rows() {
746 let none = builder.append_row(op, row);
747 debug_assert!(none.is_none());
748 }
749 }
750
751 builder.take().expect("chunk should not be empty")
752 }
753
754 pub fn sort_rows(self) -> Self {
756 if self.capacity() == 0 {
757 return self;
758 }
759 let rows = self.rows().collect_vec();
760 let mut idx = (0..self.capacity()).collect_vec();
761 idx.sort_by_key(|&i| {
762 let (op, row_ref) = rows[i];
763 (op, DefaultOrdered(row_ref))
764 });
765 StreamChunk {
766 ops: idx.iter().map(|&i| self.ops[i]).collect(),
767 data: self.data.reorder_rows(&idx),
768 }
769 }
770
771 pub fn gen_stream_chunks(
775 num_of_chunks: usize,
776 chunk_size: usize,
777 data_types: &[DataType],
778 varchar_properties: &VarcharProperty,
779 ) -> Vec<StreamChunk> {
780 Self::gen_stream_chunks_inner(
781 num_of_chunks,
782 chunk_size,
783 data_types,
784 varchar_properties,
785 1.0,
786 1.0,
787 )
788 }
789
790 pub fn gen_stream_chunks_inner(
791 num_of_chunks: usize,
792 chunk_size: usize,
793 data_types: &[DataType],
794 varchar_properties: &VarcharProperty,
795 visibility_percent: f64, inserts_percent: f64, ) -> Vec<StreamChunk> {
798 let ops = if inserts_percent == 0.0 {
799 vec![Op::Delete; chunk_size]
800 } else if inserts_percent == 1.0 {
801 vec![Op::Insert; chunk_size]
802 } else {
803 let mut rng = SmallRng::from_seed([0; 32]);
804 let mut ops = vec![];
805 for _ in 0..chunk_size {
806 ops.push(if rng.random_bool(inserts_percent) {
807 Op::Insert
808 } else {
809 Op::Delete
810 });
811 }
812 ops
813 };
814 DataChunk::gen_data_chunks(
815 num_of_chunks,
816 chunk_size,
817 data_types,
818 varchar_properties,
819 visibility_percent,
820 )
821 .into_iter()
822 .map(|chunk| StreamChunk::from_parts(ops.clone(), chunk))
823 .collect()
824 }
825}
826
827#[cfg(test)]
828mod tests {
829 use super::*;
830
831 #[test]
832 fn test_to_pretty_string() {
833 let chunk = StreamChunk::from_pretty(
834 " I I
835 + 1 6
836 - 2 .
837 U- 3 7
838 U+ 4 .",
839 );
840 assert_eq!(
841 chunk.to_pretty().to_string(),
842 "\
843+----+---+---+
844| + | 1 | 6 |
845| - | 2 | |
846| U- | 3 | 7 |
847| U+ | 4 | |
848+----+---+---+"
849 );
850 }
851
852 #[test]
853 fn test_split_1() {
854 let chunk = StreamChunk::from_pretty(
855 " I I
856 + 1 6
857 - 2 .
858 U- 3 7
859 U+ 4 .",
860 );
861 let results = chunk.split(2);
862 assert_eq!(2, results.len());
863 assert_eq!(
864 results[0].to_pretty().to_string(),
865 "\
866+---+---+---+
867| + | 1 | 6 |
868| - | 2 | |
869+---+---+---+"
870 );
871 assert_eq!(
872 results[1].to_pretty().to_string(),
873 "\
874+----+---+---+
875| U- | 3 | 7 |
876| U+ | 4 | |
877+----+---+---+"
878 );
879 }
880
881 #[test]
882 fn test_split_2() {
883 let chunk = StreamChunk::from_pretty(
884 " I I
885 + 1 6
886 U- 3 7
887 U+ 4 .
888 - 2 .",
889 );
890 let results = chunk.split(2);
891 assert_eq!(2, results.len());
892 assert_eq!(
893 results[0].to_pretty().to_string(),
894 "\
895+----+---+---+
896| + | 1 | 6 |
897| U- | 3 | 7 |
898| U+ | 4 | |
899+----+---+---+"
900 );
901 assert_eq!(
902 results[1].to_pretty().to_string(),
903 "\
904+---+---+---+
905| - | 2 | |
906+---+---+---+"
907 );
908 }
909
910 #[test]
911 fn test_eliminate_adjacent_noop_update() {
912 let c = StreamChunk::from_pretty(
913 " I I
914 - 1 6 D
915 - 2 2
916 + 2 3
917 - 2 3
918 + 1 6
919 - 1 7
920 + 1 10 D
921 + 1 7
922 U- 3 7
923 U+ 3 7
924 + 2 3",
925 );
926 let c = c.eliminate_adjacent_noop_update();
927 assert_eq!(
928 c.to_pretty().to_string(),
929 "\
930+---+---+---+
931| - | 2 | 2 |
932| + | 1 | 6 |
933| + | 2 | 3 |
934+---+---+---+"
935 );
936 }
937
938 #[test]
939 fn test_eliminate_adjacent_noop_update_normalize_update_pair() {
940 let c = StreamChunk::from_pretty(
941 " I I
942 + 1 10
943 U- 1 10
944 U+ 1 20",
945 );
946 let c = c.eliminate_adjacent_noop_update();
947 assert_eq!(
948 c.to_pretty().to_string(),
949 "\
950+---+---+----+
951| + | 1 | 20 |
952+---+---+----+"
953 );
954 }
955}