1use std::fmt::Display;
16use std::mem::size_of;
17use std::ops::{Deref, DerefMut};
18use std::sync::Arc;
19use std::{fmt, mem};
20
21use either::Either;
22use enum_as_inner::EnumAsInner;
23use itertools::Itertools;
24use rand::prelude::SmallRng;
25use rand::{Rng, SeedableRng};
26use risingwave_common_estimate_size::EstimateSize;
27use risingwave_pb::data::{PbOp, PbStreamChunk};
28
29use super::stream_chunk_builder::StreamChunkBuilder;
30use super::{ArrayImpl, ArrayRef, ArrayResult, DataChunkTestExt, RowRef};
31use crate::array::DataChunk;
32use crate::bitmap::{Bitmap, BitmapBuilder};
33use crate::catalog::Schema;
34use crate::field_generator::VarcharProperty;
35use crate::row::Row;
36use crate::types::{DataType, DefaultOrdered, ToText};
37
38#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash, EnumAsInner)]
45pub enum Op {
46 Insert,
47 Delete,
48 UpdateDelete,
49 UpdateInsert,
50}
51
52impl Op {
53 pub fn to_protobuf(self) -> PbOp {
54 match self {
55 Op::Insert => PbOp::Insert,
56 Op::Delete => PbOp::Delete,
57 Op::UpdateInsert => PbOp::UpdateInsert,
58 Op::UpdateDelete => PbOp::UpdateDelete,
59 }
60 }
61
62 pub fn from_protobuf(prost: &i32) -> ArrayResult<Op> {
63 let op = match PbOp::try_from(*prost) {
64 Ok(PbOp::Insert) => Op::Insert,
65 Ok(PbOp::Delete) => Op::Delete,
66 Ok(PbOp::UpdateInsert) => Op::UpdateInsert,
67 Ok(PbOp::UpdateDelete) => Op::UpdateDelete,
68 Ok(PbOp::Unspecified) => unreachable!(),
69 Err(_) => bail!("No such op type"),
70 };
71 Ok(op)
72 }
73
74 pub fn normalize_update(self) -> Op {
76 match self {
77 Op::Insert => Op::Insert,
78 Op::Delete => Op::Delete,
79 Op::UpdateDelete => Op::Delete,
80 Op::UpdateInsert => Op::Insert,
81 }
82 }
83
84 pub fn to_i16(self) -> i16 {
85 match self {
86 Op::Insert => 1,
87 Op::Delete => 2,
88 Op::UpdateInsert => 3,
89 Op::UpdateDelete => 4,
90 }
91 }
92
93 pub fn to_varchar(self) -> String {
94 match self {
95 Op::Insert => "Insert",
96 Op::Delete => "Delete",
97 Op::UpdateInsert => "UpdateInsert",
98 Op::UpdateDelete => "UpdateDelete",
99 }
100 .to_owned()
101 }
102}
103
104#[derive(Clone, PartialEq)]
106pub struct StreamChunk {
107 ops: Arc<[Op]>,
109 data: DataChunk,
110}
111
112impl Default for StreamChunk {
113 fn default() -> Self {
117 Self {
118 ops: Arc::new([]),
119 data: DataChunk::new(vec![], 0),
120 }
121 }
122}
123
124impl StreamChunk {
125 pub fn new(ops: impl Into<Arc<[Op]>>, columns: Vec<ArrayRef>) -> Self {
127 let ops = ops.into();
128 let visibility = Bitmap::ones(ops.len());
129 Self::with_visibility(ops, columns, visibility)
130 }
131
132 pub fn with_visibility(
134 ops: impl Into<Arc<[Op]>>,
135 columns: Vec<ArrayRef>,
136 visibility: Bitmap,
137 ) -> Self {
138 let ops = ops.into();
139 for col in &columns {
140 assert_eq!(col.len(), ops.len());
141 }
142 let data = DataChunk::new(columns, visibility);
143 StreamChunk { ops, data }
144 }
145
146 pub fn from_rows(rows: &[(Op, impl Row)], data_types: &[DataType]) -> Self {
153 let mut builder = StreamChunkBuilder::unlimited(data_types.to_vec(), Some(rows.len()));
154
155 for (op, row) in rows {
156 let none = builder.append_row(*op, row);
157 debug_assert!(none.is_none());
158 }
159
160 builder.take().expect("chunk should not be empty")
161 }
162
163 pub fn empty(data_types: &[DataType]) -> Self {
164 StreamChunkBuilder::build_empty(data_types.to_vec())
165 }
166
167 pub fn data_chunk(&self) -> &DataChunk {
169 &self.data
170 }
171
172 pub fn compact(self) -> Self {
174 if self.is_compacted() {
175 return self;
176 }
177
178 let (ops, columns, visibility) = self.into_inner();
179
180 let cardinality = visibility
181 .iter()
182 .fold(0, |vis_cnt, vis| vis_cnt + vis as usize);
183 let columns: Vec<_> = columns
184 .into_iter()
185 .map(|col| col.compact(&visibility, cardinality).into())
186 .collect();
187 let mut new_ops = Vec::with_capacity(cardinality);
188 for idx in visibility.iter_ones() {
189 new_ops.push(ops[idx]);
190 }
191 StreamChunk::new(new_ops, columns)
192 }
193
194 pub fn split(&self, size: usize) -> Vec<Self> {
202 let mut builder = StreamChunkBuilder::new(size, self.data_types());
203 let mut outputs = Vec::new();
204
205 for (op, row) in self.rows() {
207 if let Some(chunk) = builder.append_row(op, row) {
208 outputs.push(chunk);
209 }
210 }
211 if let Some(output) = builder.take() {
212 outputs.push(output);
213 }
214
215 outputs
216 }
217
218 pub fn into_parts(self) -> (DataChunk, Arc<[Op]>) {
219 (self.data, self.ops)
220 }
221
222 pub fn from_parts(ops: impl Into<Arc<[Op]>>, data_chunk: DataChunk) -> Self {
223 let (columns, vis) = data_chunk.into_parts();
224 Self::with_visibility(ops, columns, vis)
225 }
226
227 pub fn into_inner(self) -> (Arc<[Op]>, Vec<ArrayRef>, Bitmap) {
228 let (columns, vis) = self.data.into_parts();
229 (self.ops, columns, vis)
230 }
231
232 pub fn to_protobuf(&self) -> PbStreamChunk {
233 if !self.is_compacted() {
234 return self.clone().compact().to_protobuf();
235 }
236 PbStreamChunk {
237 cardinality: self.cardinality() as u32,
238 ops: self.ops.iter().map(|op| op.to_protobuf() as i32).collect(),
239 columns: self.columns().iter().map(|col| col.to_protobuf()).collect(),
240 }
241 }
242
243 pub fn from_protobuf(prost: &PbStreamChunk) -> ArrayResult<Self> {
244 let cardinality = prost.get_cardinality() as usize;
245 let mut ops = Vec::with_capacity(cardinality);
246 for op in prost.get_ops() {
247 ops.push(Op::from_protobuf(op)?);
248 }
249 let mut columns = vec![];
250 for column in prost.get_columns() {
251 columns.push(ArrayImpl::from_protobuf(column, cardinality)?.into());
252 }
253 Ok(StreamChunk::new(ops, columns))
254 }
255
256 pub fn ops(&self) -> &[Op] {
257 &self.ops
258 }
259
260 pub fn to_pretty(&self) -> impl Display + use<> {
262 self.to_pretty_inner(None)
263 }
264
265 pub fn to_pretty_with_schema(&self, schema: &Schema) -> impl Display + use<> {
268 self.to_pretty_inner(Some(schema))
269 }
270
271 fn to_pretty_inner(&self, schema: Option<&Schema>) -> impl Display + use<> {
272 use comfy_table::{Cell, CellAlignment, Table};
273
274 if self.cardinality() == 0 {
275 return Either::Left("(empty)");
276 }
277
278 let mut table = Table::new();
279 table.load_preset(DataChunk::PRETTY_TABLE_PRESET);
280
281 if let Some(schema) = schema {
282 assert_eq!(self.dimension(), schema.len());
283 let cells = std::iter::once(String::new())
284 .chain(schema.fields().iter().map(|f| f.name.clone()));
285 table.set_header(cells);
286 }
287
288 for (op, row_ref) in self.rows() {
289 let mut cells = Vec::with_capacity(row_ref.len() + 1);
290 cells.push(
291 Cell::new(match op {
292 Op::Insert => "+",
293 Op::Delete => "-",
294 Op::UpdateDelete => "U-",
295 Op::UpdateInsert => "U+",
296 })
297 .set_alignment(CellAlignment::Right),
298 );
299 for datum in row_ref.iter() {
300 let str = match datum {
301 None => "".to_owned(), Some(scalar) => scalar.to_text(),
303 };
304 cells.push(Cell::new(str));
305 }
306 table.add_row(cells);
307 }
308
309 Either::Right(table)
310 }
311
312 pub fn project(&self, indices: &[usize]) -> Self {
318 Self {
319 ops: self.ops.clone(),
320 data: self.data.project(indices),
321 }
322 }
323
324 pub fn eliminate_adjacent_noop_update(self) -> Self {
326 let len = self.data_chunk().capacity();
327 let mut c: StreamChunkMut = self.into();
328 let mut prev_r = None;
329 for curr in 0..len {
330 if !c.vis(curr) {
331 continue;
332 }
333 if let Some(prev) = prev_r {
334 if matches!(c.op(prev), Op::UpdateDelete | Op::Delete)
335 && matches!(c.op(curr), Op::UpdateInsert | Op::Insert)
336 && c.row_ref(prev) == c.row_ref(curr)
337 {
338 c.set_vis(prev, false);
339 c.set_vis(curr, false);
340 prev_r = None;
341 } else {
342 prev_r = Some(curr)
343 }
344 } else {
345 prev_r = Some(curr);
346 }
347 }
348 c.into()
349 }
350
351 pub fn project_with_vis(&self, indices: &[usize], vis: Bitmap) -> Self {
353 Self {
354 ops: self.ops.clone(),
355 data: self.data.project_with_vis(indices, vis),
356 }
357 }
358
359 pub fn clone_with_vis(&self, vis: Bitmap) -> Self {
361 Self {
362 ops: self.ops.clone(),
363 data: self.data.with_visibility(vis),
364 }
365 }
366
367 pub fn compute_rate_limit_chunk_permits(&self) -> u64 {
369 self.capacity() as _
370 }
371}
372
373impl Deref for StreamChunk {
374 type Target = DataChunk;
375
376 fn deref(&self) -> &Self::Target {
377 &self.data
378 }
379}
380
381impl DerefMut for StreamChunk {
382 fn deref_mut(&mut self) -> &mut Self::Target {
383 &mut self.data
384 }
385}
386
387impl From<DataChunk> for StreamChunk {
389 fn from(data: DataChunk) -> Self {
390 Self::from_parts(vec![Op::Insert; data.capacity()], data)
391 }
392}
393
394impl fmt::Debug for StreamChunk {
395 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
396 if f.alternate() {
397 write!(
398 f,
399 "StreamChunk {{ cardinality: {}, capacity: {}, data:\n{}\n }}",
400 self.cardinality(),
401 self.capacity(),
402 self.to_pretty()
403 )
404 } else {
405 f.debug_struct("StreamChunk")
406 .field("cardinality", &self.cardinality())
407 .field("capacity", &self.capacity())
408 .finish_non_exhaustive()
409 }
410 }
411}
412
413impl EstimateSize for StreamChunk {
414 fn estimated_heap_size(&self) -> usize {
415 self.data.estimated_heap_size() + self.ops.len() * size_of::<Op>()
416 }
417}
418
419enum OpsMutState {
420 ArcRef(Arc<[Op]>),
421 Mut(Vec<Op>),
422}
423
424impl OpsMutState {
425 const UNDEFINED: Self = Self::Mut(Vec::new());
426}
427
428pub struct OpsMut {
429 state: OpsMutState,
430}
431
432impl OpsMut {
433 pub fn new(ops: Arc<[Op]>) -> Self {
434 Self {
435 state: OpsMutState::ArcRef(ops),
436 }
437 }
438
439 pub fn len(&self) -> usize {
440 match &self.state {
441 OpsMutState::ArcRef(v) => v.len(),
442 OpsMutState::Mut(v) => v.len(),
443 }
444 }
445
446 pub fn is_empty(&self) -> bool {
447 self.len() == 0
448 }
449
450 pub fn set(&mut self, n: usize, val: Op) {
451 debug_assert!(n < self.len());
452 if let OpsMutState::Mut(v) = &mut self.state {
453 v[n] = val;
454 } else {
455 let state = mem::replace(&mut self.state, OpsMutState::UNDEFINED); let mut v = match state {
457 OpsMutState::ArcRef(v) => v.to_vec(),
458 OpsMutState::Mut(_) => unreachable!(),
459 };
460 v[n] = val;
461 self.state = OpsMutState::Mut(v);
462 }
463 }
464
465 pub fn get(&self, n: usize) -> Op {
466 debug_assert!(n < self.len());
467 match &self.state {
468 OpsMutState::ArcRef(v) => v[n],
469 OpsMutState::Mut(v) => v[n],
470 }
471 }
472}
473impl From<OpsMut> for Arc<[Op]> {
474 fn from(v: OpsMut) -> Self {
475 match v.state {
476 OpsMutState::ArcRef(a) => a,
477 OpsMutState::Mut(v) => v.into(),
478 }
479 }
480}
481
482pub struct StreamChunkMut {
485 columns: Arc<[ArrayRef]>,
486 ops: OpsMut,
487 vis: BitmapBuilder,
488}
489
490impl From<StreamChunk> for StreamChunkMut {
491 fn from(c: StreamChunk) -> Self {
492 let (c, ops) = c.into_parts();
493 let (columns, vis) = c.into_parts_v2();
494 Self {
495 columns,
496 ops: OpsMut::new(ops),
497 vis: vis.into(),
498 }
499 }
500}
501
502impl From<StreamChunkMut> for StreamChunk {
503 fn from(c: StreamChunkMut) -> Self {
504 StreamChunk::from_parts(c.ops, DataChunk::from_parts(c.columns, c.vis.finish()))
505 }
506}
507
508pub struct OpRowMutRef<'a> {
509 c: &'a mut StreamChunkMut,
510 i: usize,
511}
512
513impl OpRowMutRef<'_> {
514 pub fn index(&self) -> usize {
515 self.i
516 }
517
518 pub fn vis(&self) -> bool {
519 self.c.vis.is_set(self.i)
520 }
521
522 pub fn op(&self) -> Op {
523 self.c.ops.get(self.i)
524 }
525
526 pub fn set_vis(&mut self, val: bool) {
527 self.c.set_vis(self.i, val);
528 }
529
530 pub fn set_op(&mut self, val: Op) {
531 self.c.set_op(self.i, val);
532 }
533
534 pub fn row_ref(&self) -> RowRef<'_> {
535 RowRef::with_columns(self.c.columns(), self.i)
536 }
537
538 pub fn same_chunk(&self, other: &Self) -> bool {
540 std::ptr::eq(self.c, other.c)
541 }
542}
543
544impl StreamChunkMut {
545 pub fn capacity(&self) -> usize {
546 self.vis.len()
547 }
548
549 pub fn vis(&self, i: usize) -> bool {
550 self.vis.is_set(i)
551 }
552
553 pub fn op(&self, i: usize) -> Op {
554 self.ops.get(i)
555 }
556
557 pub fn row_ref(&self, i: usize) -> RowRef<'_> {
558 RowRef::with_columns(self.columns(), i)
559 }
560
561 pub fn set_vis(&mut self, n: usize, val: bool) {
562 self.vis.set(n, val);
563 }
564
565 pub fn set_op(&mut self, n: usize, val: Op) {
566 self.ops.set(n, val);
567 }
568
569 pub fn columns(&self) -> &[ArrayRef] {
570 &self.columns
571 }
572
573 pub fn to_rows_mut(&mut self) -> impl Iterator<Item = (RowRef<'_>, OpRowMutRef<'_>)> {
575 unsafe {
576 (0..self.vis.len())
577 .filter(|i| self.vis.is_set(*i))
578 .map(|i| {
579 let p = self as *const StreamChunkMut;
580 let p = p as *mut StreamChunkMut;
581 (
582 RowRef::with_columns(self.columns(), i),
583 OpRowMutRef { c: &mut *p, i },
584 )
585 })
586 }
587 }
588}
589
590#[easy_ext::ext(StreamChunkTestExt)]
592impl StreamChunk {
593 pub fn from_pretty(s: &str) -> Self {
634 let mut chunk_str = String::new();
635 let mut ops = vec![];
636
637 let (header, body) = match s.split_once('\n') {
638 Some(pair) => pair,
639 None => {
640 return StreamChunk {
642 ops: Arc::new([]),
643 data: DataChunk::from_pretty(s),
644 };
645 }
646 };
647 chunk_str.push_str(header);
648 chunk_str.push('\n');
649
650 for line in body.split_inclusive('\n') {
651 if line.trim_start().is_empty() {
652 continue;
653 }
654 let (op, row) = line
655 .trim_start()
656 .split_once(|c: char| c.is_ascii_whitespace())
657 .ok_or_else(|| panic!("missing operation: {line:?}"))
658 .unwrap();
659 ops.push(match op {
660 "+" => Op::Insert,
661 "-" => Op::Delete,
662 "U+" => Op::UpdateInsert,
663 "U-" => Op::UpdateDelete,
664 t => panic!("invalid op: {t:?}"),
665 });
666 chunk_str.push_str(row);
667 }
668 StreamChunk {
669 ops: ops.into(),
670 data: DataChunk::from_pretty(&chunk_str),
671 }
672 }
673
674 pub fn valid(&self) -> bool {
676 let len = self.ops.len();
677 let data = &self.data;
678 data.visibility().len() == len && data.columns().iter().all(|col| col.len() == len)
679 }
680
681 pub fn concat(chunks: Vec<StreamChunk>) -> StreamChunk {
685 let data_types = chunks[0].data_types();
686 let size = chunks.iter().map(|c| c.cardinality()).sum::<usize>();
687
688 let mut builder = StreamChunkBuilder::unlimited(data_types, Some(size));
689
690 for chunk in chunks {
691 for (op, row) in chunk.rows() {
693 let none = builder.append_row(op, row);
694 debug_assert!(none.is_none());
695 }
696 }
697
698 builder.take().expect("chunk should not be empty")
699 }
700
701 pub fn sort_rows(self) -> Self {
703 if self.capacity() == 0 {
704 return self;
705 }
706 let rows = self.rows().collect_vec();
707 let mut idx = (0..self.capacity()).collect_vec();
708 idx.sort_by_key(|&i| {
709 let (op, row_ref) = rows[i];
710 (op, DefaultOrdered(row_ref))
711 });
712 StreamChunk {
713 ops: idx.iter().map(|&i| self.ops[i]).collect(),
714 data: self.data.reorder_rows(&idx),
715 }
716 }
717
718 pub fn gen_stream_chunks(
722 num_of_chunks: usize,
723 chunk_size: usize,
724 data_types: &[DataType],
725 varchar_properties: &VarcharProperty,
726 ) -> Vec<StreamChunk> {
727 Self::gen_stream_chunks_inner(
728 num_of_chunks,
729 chunk_size,
730 data_types,
731 varchar_properties,
732 1.0,
733 1.0,
734 )
735 }
736
737 pub fn gen_stream_chunks_inner(
738 num_of_chunks: usize,
739 chunk_size: usize,
740 data_types: &[DataType],
741 varchar_properties: &VarcharProperty,
742 visibility_percent: f64, inserts_percent: f64, ) -> Vec<StreamChunk> {
745 let ops = if inserts_percent == 0.0 {
746 vec![Op::Delete; chunk_size]
747 } else if inserts_percent == 1.0 {
748 vec![Op::Insert; chunk_size]
749 } else {
750 let mut rng = SmallRng::from_seed([0; 32]);
751 let mut ops = vec![];
752 for _ in 0..chunk_size {
753 ops.push(if rng.random_bool(inserts_percent) {
754 Op::Insert
755 } else {
756 Op::Delete
757 });
758 }
759 ops
760 };
761 DataChunk::gen_data_chunks(
762 num_of_chunks,
763 chunk_size,
764 data_types,
765 varchar_properties,
766 visibility_percent,
767 )
768 .into_iter()
769 .map(|chunk| StreamChunk::from_parts(ops.clone(), chunk))
770 .collect()
771 }
772}
773
774#[cfg(test)]
775mod tests {
776 use super::*;
777
778 #[test]
779 fn test_to_pretty_string() {
780 let chunk = StreamChunk::from_pretty(
781 " I I
782 + 1 6
783 - 2 .
784 U- 3 7
785 U+ 4 .",
786 );
787 assert_eq!(
788 chunk.to_pretty().to_string(),
789 "\
790+----+---+---+
791| + | 1 | 6 |
792| - | 2 | |
793| U- | 3 | 7 |
794| U+ | 4 | |
795+----+---+---+"
796 );
797 }
798
799 #[test]
800 fn test_split_1() {
801 let chunk = StreamChunk::from_pretty(
802 " I I
803 + 1 6
804 - 2 .
805 U- 3 7
806 U+ 4 .",
807 );
808 let results = chunk.split(2);
809 assert_eq!(2, results.len());
810 assert_eq!(
811 results[0].to_pretty().to_string(),
812 "\
813+---+---+---+
814| + | 1 | 6 |
815| - | 2 | |
816+---+---+---+"
817 );
818 assert_eq!(
819 results[1].to_pretty().to_string(),
820 "\
821+----+---+---+
822| U- | 3 | 7 |
823| U+ | 4 | |
824+----+---+---+"
825 );
826 }
827
828 #[test]
829 fn test_split_2() {
830 let chunk = StreamChunk::from_pretty(
831 " I I
832 + 1 6
833 U- 3 7
834 U+ 4 .
835 - 2 .",
836 );
837 let results = chunk.split(2);
838 assert_eq!(2, results.len());
839 assert_eq!(
840 results[0].to_pretty().to_string(),
841 "\
842+----+---+---+
843| + | 1 | 6 |
844| U- | 3 | 7 |
845| U+ | 4 | |
846+----+---+---+"
847 );
848 assert_eq!(
849 results[1].to_pretty().to_string(),
850 "\
851+---+---+---+
852| - | 2 | |
853+---+---+---+"
854 );
855 }
856
857 #[test]
858 fn test_eliminate_adjacent_noop_update() {
859 let c = StreamChunk::from_pretty(
860 " I I
861 - 1 6 D
862 - 2 2
863 + 2 3
864 - 2 3
865 + 1 6
866 - 1 7
867 + 1 10 D
868 + 1 7
869 U- 3 7
870 U+ 3 7
871 + 2 3",
872 );
873 let c = c.eliminate_adjacent_noop_update();
874 assert_eq!(
875 c.to_pretty().to_string(),
876 "\
877+---+---+---+
878| - | 2 | 2 |
879| + | 2 | 3 |
880| - | 2 | 3 |
881| + | 1 | 6 |
882| + | 2 | 3 |
883+---+---+---+"
884 );
885 }
886}