risingwave_common/array/
stream_chunk.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Display;
16use std::mem::size_of;
17use std::ops::{Deref, DerefMut};
18use std::sync::Arc;
19use std::{fmt, mem};
20
21use either::Either;
22use enum_as_inner::EnumAsInner;
23use itertools::Itertools;
24use rand::prelude::SmallRng;
25use rand::{Rng, SeedableRng};
26use risingwave_common_estimate_size::EstimateSize;
27use risingwave_pb::data::{PbOp, PbStreamChunk};
28
29use super::stream_chunk_builder::StreamChunkBuilder;
30use super::{ArrayImpl, ArrayRef, ArrayResult, DataChunkTestExt, RowRef};
31use crate::array::DataChunk;
32use crate::bitmap::{Bitmap, BitmapBuilder};
33use crate::catalog::Schema;
34use crate::field_generator::VarcharProperty;
35use crate::row::Row;
36use crate::types::{DataType, DefaultOrdered, ToText};
37
38/// `Op` represents three operations in `StreamChunk`.
39///
40/// `UpdateDelete` and `UpdateInsert` are semantically equivalent to `Delete` and `Insert`
41/// but always appear in pairs to represent an update operation.
42/// For example, table source, aggregation and outer join can generate updates by themselves,
43/// while most of the other operators only pass through updates with best effort.
44#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash, EnumAsInner)]
45pub enum Op {
46    Insert,
47    Delete,
48    UpdateDelete,
49    UpdateInsert,
50}
51
52impl Op {
53    pub fn to_protobuf(self) -> PbOp {
54        match self {
55            Op::Insert => PbOp::Insert,
56            Op::Delete => PbOp::Delete,
57            Op::UpdateInsert => PbOp::UpdateInsert,
58            Op::UpdateDelete => PbOp::UpdateDelete,
59        }
60    }
61
62    pub fn from_protobuf(prost: &i32) -> ArrayResult<Op> {
63        let op = match PbOp::try_from(*prost) {
64            Ok(PbOp::Insert) => Op::Insert,
65            Ok(PbOp::Delete) => Op::Delete,
66            Ok(PbOp::UpdateInsert) => Op::UpdateInsert,
67            Ok(PbOp::UpdateDelete) => Op::UpdateDelete,
68            Ok(PbOp::Unspecified) => unreachable!(),
69            Err(_) => bail!("No such op type"),
70        };
71        Ok(op)
72    }
73
74    /// convert `UpdateDelete` to `Delete` and `UpdateInsert` to Insert
75    pub fn normalize_update(self) -> Op {
76        match self {
77            Op::Insert => Op::Insert,
78            Op::Delete => Op::Delete,
79            Op::UpdateDelete => Op::Delete,
80            Op::UpdateInsert => Op::Insert,
81        }
82    }
83
84    pub fn to_i16(self) -> i16 {
85        match self {
86            Op::Insert => 1,
87            Op::Delete => 2,
88            Op::UpdateInsert => 3,
89            Op::UpdateDelete => 4,
90        }
91    }
92
93    pub fn to_varchar(self) -> String {
94        match self {
95            Op::Insert => "Insert",
96            Op::Delete => "Delete",
97            Op::UpdateInsert => "UpdateInsert",
98            Op::UpdateDelete => "UpdateDelete",
99        }
100        .to_owned()
101    }
102}
103
104/// `StreamChunk` is used to pass data over the streaming pathway.
105#[derive(Clone, PartialEq)]
106pub struct StreamChunk {
107    // TODO: Optimize using bitmap
108    ops: Arc<[Op]>,
109    data: DataChunk,
110}
111
112impl Default for StreamChunk {
113    /// Create a 0-row-0-col `StreamChunk`. Only used in some existing tests.
114    /// This is NOT the same as an **empty** chunk, which has 0 rows but with
115    /// columns aligned with executor schema.
116    fn default() -> Self {
117        Self {
118            ops: Arc::new([]),
119            data: DataChunk::new(vec![], 0),
120        }
121    }
122}
123
124impl StreamChunk {
125    /// Create a new `StreamChunk` with given ops and columns.
126    pub fn new(ops: impl Into<Arc<[Op]>>, columns: Vec<ArrayRef>) -> Self {
127        let ops = ops.into();
128        let visibility = Bitmap::ones(ops.len());
129        Self::with_visibility(ops, columns, visibility)
130    }
131
132    /// Create a new `StreamChunk` with given ops, columns and visibility.
133    pub fn with_visibility(
134        ops: impl Into<Arc<[Op]>>,
135        columns: Vec<ArrayRef>,
136        visibility: Bitmap,
137    ) -> Self {
138        let ops = ops.into();
139        for col in &columns {
140            assert_eq!(col.len(), ops.len());
141        }
142        let data = DataChunk::new(columns, visibility);
143        StreamChunk { ops, data }
144    }
145
146    /// Build a `StreamChunk` from rows.
147    ///
148    /// Panics if the `rows` is empty.
149    ///
150    /// Should prefer using [`StreamChunkBuilder`] instead to avoid unnecessary
151    /// allocation of rows.
152    pub fn from_rows(rows: &[(Op, impl Row)], data_types: &[DataType]) -> Self {
153        let mut builder = StreamChunkBuilder::unlimited(data_types.to_vec(), Some(rows.len()));
154
155        for (op, row) in rows {
156            let none = builder.append_row(*op, row);
157            debug_assert!(none.is_none());
158        }
159
160        builder.take().expect("chunk should not be empty")
161    }
162
163    pub fn empty(data_types: &[DataType]) -> Self {
164        StreamChunkBuilder::build_empty(data_types.to_vec())
165    }
166
167    /// Get the reference of the underlying data chunk.
168    pub fn data_chunk(&self) -> &DataChunk {
169        &self.data
170    }
171
172    /// compact the `StreamChunk` with its visibility map
173    pub fn compact(self) -> Self {
174        if self.is_compacted() {
175            return self;
176        }
177
178        let (ops, columns, visibility) = self.into_inner();
179
180        let cardinality = visibility
181            .iter()
182            .fold(0, |vis_cnt, vis| vis_cnt + vis as usize);
183        let columns: Vec<_> = columns
184            .into_iter()
185            .map(|col| col.compact(&visibility, cardinality).into())
186            .collect();
187        let mut new_ops = Vec::with_capacity(cardinality);
188        for idx in visibility.iter_ones() {
189            new_ops.push(ops[idx]);
190        }
191        StreamChunk::new(new_ops, columns)
192    }
193
194    /// Split the `StreamChunk` into multiple chunks with the given size at most.
195    ///
196    /// When the total cardinality of all the chunks is not evenly divided by the `size`,
197    /// the last new chunk will be the remainder.
198    ///
199    /// For consecutive `UpdateDelete` and `UpdateInsert`, they will be kept in one chunk.
200    /// As a result, some chunks may have `size + 1` rows.
201    pub fn split(&self, size: usize) -> Vec<Self> {
202        let mut builder = StreamChunkBuilder::new(size, self.data_types());
203        let mut outputs = Vec::new();
204
205        // TODO: directly append the chunk.
206        for (op, row) in self.rows() {
207            if let Some(chunk) = builder.append_row(op, row) {
208                outputs.push(chunk);
209            }
210        }
211        if let Some(output) = builder.take() {
212            outputs.push(output);
213        }
214
215        outputs
216    }
217
218    pub fn into_parts(self) -> (DataChunk, Arc<[Op]>) {
219        (self.data, self.ops)
220    }
221
222    pub fn from_parts(ops: impl Into<Arc<[Op]>>, data_chunk: DataChunk) -> Self {
223        let (columns, vis) = data_chunk.into_parts();
224        Self::with_visibility(ops, columns, vis)
225    }
226
227    pub fn into_inner(self) -> (Arc<[Op]>, Vec<ArrayRef>, Bitmap) {
228        let (columns, vis) = self.data.into_parts();
229        (self.ops, columns, vis)
230    }
231
232    pub fn to_protobuf(&self) -> PbStreamChunk {
233        if !self.is_compacted() {
234            return self.clone().compact().to_protobuf();
235        }
236        PbStreamChunk {
237            cardinality: self.cardinality() as u32,
238            ops: self.ops.iter().map(|op| op.to_protobuf() as i32).collect(),
239            columns: self.columns().iter().map(|col| col.to_protobuf()).collect(),
240        }
241    }
242
243    pub fn from_protobuf(prost: &PbStreamChunk) -> ArrayResult<Self> {
244        let cardinality = prost.get_cardinality() as usize;
245        let mut ops = Vec::with_capacity(cardinality);
246        for op in prost.get_ops() {
247            ops.push(Op::from_protobuf(op)?);
248        }
249        let mut columns = vec![];
250        for column in prost.get_columns() {
251            columns.push(ArrayImpl::from_protobuf(column, cardinality)?.into());
252        }
253        Ok(StreamChunk::new(ops, columns))
254    }
255
256    pub fn ops(&self) -> &[Op] {
257        &self.ops
258    }
259
260    /// Returns a table-like text representation of the `StreamChunk`.
261    pub fn to_pretty(&self) -> impl Display + use<> {
262        self.to_pretty_inner(None)
263    }
264
265    /// Returns a table-like text representation of the `StreamChunk` with a header of column names
266    /// from the given `schema`.
267    pub fn to_pretty_with_schema(&self, schema: &Schema) -> impl Display + use<> {
268        self.to_pretty_inner(Some(schema))
269    }
270
271    fn to_pretty_inner(&self, schema: Option<&Schema>) -> impl Display + use<> {
272        use comfy_table::{Cell, CellAlignment, Table};
273
274        if self.cardinality() == 0 {
275            return Either::Left("(empty)");
276        }
277
278        let mut table = Table::new();
279        table.load_preset(DataChunk::PRETTY_TABLE_PRESET);
280
281        if let Some(schema) = schema {
282            assert_eq!(self.dimension(), schema.len());
283            let cells = std::iter::once(String::new())
284                .chain(schema.fields().iter().map(|f| f.name.clone()));
285            table.set_header(cells);
286        }
287
288        for (op, row_ref) in self.rows() {
289            let mut cells = Vec::with_capacity(row_ref.len() + 1);
290            cells.push(
291                Cell::new(match op {
292                    Op::Insert => "+",
293                    Op::Delete => "-",
294                    Op::UpdateDelete => "U-",
295                    Op::UpdateInsert => "U+",
296                })
297                .set_alignment(CellAlignment::Right),
298            );
299            for datum in row_ref.iter() {
300                let str = match datum {
301                    None => "".to_owned(), // NULL
302                    Some(scalar) => scalar.to_text(),
303                };
304                cells.push(Cell::new(str));
305            }
306            table.add_row(cells);
307        }
308
309        Either::Right(table)
310    }
311
312    /// Reorder (and possibly remove) columns.
313    ///
314    /// e.g. if `indices` is `[2, 1, 0]`, and the chunk contains column `[a, b, c]`, then the output
315    /// will be `[c, b, a]`. If `indices` is [2, 0], then the output will be `[c, a]`.
316    /// If the input mapping is identity mapping, no reorder will be performed.
317    pub fn project(&self, indices: &[usize]) -> Self {
318        Self {
319            ops: self.ops.clone(),
320            data: self.data.project(indices),
321        }
322    }
323
324    /// Remove the adjacent delete-insert if their row value are the same.
325    pub fn eliminate_adjacent_noop_update(self) -> Self {
326        let len = self.data_chunk().capacity();
327        let mut c: StreamChunkMut = self.into();
328        let mut prev_r = None;
329        for curr in 0..len {
330            if !c.vis(curr) {
331                continue;
332            }
333            if let Some(prev) = prev_r {
334                if matches!(c.op(prev), Op::UpdateDelete | Op::Delete)
335                    && matches!(c.op(curr), Op::UpdateInsert | Op::Insert)
336                    && c.row_ref(prev) == c.row_ref(curr)
337                {
338                    c.set_vis(prev, false);
339                    c.set_vis(curr, false);
340                    prev_r = None;
341                } else {
342                    prev_r = Some(curr)
343                }
344            } else {
345                prev_r = Some(curr);
346            }
347        }
348        c.into()
349    }
350
351    /// Reorder columns and set visibility.
352    pub fn project_with_vis(&self, indices: &[usize], vis: Bitmap) -> Self {
353        Self {
354            ops: self.ops.clone(),
355            data: self.data.project_with_vis(indices, vis),
356        }
357    }
358
359    /// Clone the `StreamChunk` with a new visibility.
360    pub fn clone_with_vis(&self, vis: Bitmap) -> Self {
361        Self {
362            ops: self.ops.clone(),
363            data: self.data.with_visibility(vis),
364        }
365    }
366
367    // Compute the required permits of this chunk for rate limiting.
368    pub fn compute_rate_limit_chunk_permits(&self) -> u64 {
369        self.capacity() as _
370    }
371}
372
373impl Deref for StreamChunk {
374    type Target = DataChunk;
375
376    fn deref(&self) -> &Self::Target {
377        &self.data
378    }
379}
380
381impl DerefMut for StreamChunk {
382    fn deref_mut(&mut self) -> &mut Self::Target {
383        &mut self.data
384    }
385}
386
387/// `StreamChunk` can be created from `DataChunk` with all operations set to `Insert`.
388impl From<DataChunk> for StreamChunk {
389    fn from(data: DataChunk) -> Self {
390        Self::from_parts(vec![Op::Insert; data.capacity()], data)
391    }
392}
393
394impl fmt::Debug for StreamChunk {
395    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
396        if f.alternate() {
397            write!(
398                f,
399                "StreamChunk {{ cardinality: {}, capacity: {}, data:\n{}\n }}",
400                self.cardinality(),
401                self.capacity(),
402                self.to_pretty()
403            )
404        } else {
405            f.debug_struct("StreamChunk")
406                .field("cardinality", &self.cardinality())
407                .field("capacity", &self.capacity())
408                .finish_non_exhaustive()
409        }
410    }
411}
412
413impl EstimateSize for StreamChunk {
414    fn estimated_heap_size(&self) -> usize {
415        self.data.estimated_heap_size() + self.ops.len() * size_of::<Op>()
416    }
417}
418
419enum OpsMutState {
420    ArcRef(Arc<[Op]>),
421    Mut(Vec<Op>),
422}
423
424impl OpsMutState {
425    const UNDEFINED: Self = Self::Mut(Vec::new());
426}
427
428pub struct OpsMut {
429    state: OpsMutState,
430}
431
432impl OpsMut {
433    pub fn new(ops: Arc<[Op]>) -> Self {
434        Self {
435            state: OpsMutState::ArcRef(ops),
436        }
437    }
438
439    pub fn len(&self) -> usize {
440        match &self.state {
441            OpsMutState::ArcRef(v) => v.len(),
442            OpsMutState::Mut(v) => v.len(),
443        }
444    }
445
446    pub fn is_empty(&self) -> bool {
447        self.len() == 0
448    }
449
450    pub fn set(&mut self, n: usize, val: Op) {
451        debug_assert!(n < self.len());
452        if let OpsMutState::Mut(v) = &mut self.state {
453            v[n] = val;
454        } else {
455            let state = mem::replace(&mut self.state, OpsMutState::UNDEFINED); // intermediate state
456            let mut v = match state {
457                OpsMutState::ArcRef(v) => v.to_vec(),
458                OpsMutState::Mut(_) => unreachable!(),
459            };
460            v[n] = val;
461            self.state = OpsMutState::Mut(v);
462        }
463    }
464
465    pub fn get(&self, n: usize) -> Op {
466        debug_assert!(n < self.len());
467        match &self.state {
468            OpsMutState::ArcRef(v) => v[n],
469            OpsMutState::Mut(v) => v[n],
470        }
471    }
472}
473impl From<OpsMut> for Arc<[Op]> {
474    fn from(v: OpsMut) -> Self {
475        match v.state {
476            OpsMutState::ArcRef(a) => a,
477            OpsMutState::Mut(v) => v.into(),
478        }
479    }
480}
481
482/// A mutable wrapper for `StreamChunk`. can only set the visibilities and ops in place, can not
483/// change the length.
484pub struct StreamChunkMut {
485    columns: Arc<[ArrayRef]>,
486    ops: OpsMut,
487    vis: BitmapBuilder,
488}
489
490impl From<StreamChunk> for StreamChunkMut {
491    fn from(c: StreamChunk) -> Self {
492        let (c, ops) = c.into_parts();
493        let (columns, vis) = c.into_parts_v2();
494        Self {
495            columns,
496            ops: OpsMut::new(ops),
497            vis: vis.into(),
498        }
499    }
500}
501
502impl From<StreamChunkMut> for StreamChunk {
503    fn from(c: StreamChunkMut) -> Self {
504        StreamChunk::from_parts(c.ops, DataChunk::from_parts(c.columns, c.vis.finish()))
505    }
506}
507
508pub struct OpRowMutRef<'a> {
509    c: &'a mut StreamChunkMut,
510    i: usize,
511}
512
513impl OpRowMutRef<'_> {
514    pub fn index(&self) -> usize {
515        self.i
516    }
517
518    pub fn vis(&self) -> bool {
519        self.c.vis.is_set(self.i)
520    }
521
522    pub fn op(&self) -> Op {
523        self.c.ops.get(self.i)
524    }
525
526    pub fn set_vis(&mut self, val: bool) {
527        self.c.set_vis(self.i, val);
528    }
529
530    pub fn set_op(&mut self, val: Op) {
531        self.c.set_op(self.i, val);
532    }
533
534    pub fn row_ref(&self) -> RowRef<'_> {
535        RowRef::with_columns(self.c.columns(), self.i)
536    }
537
538    /// return if the two row ref is in the same chunk
539    pub fn same_chunk(&self, other: &Self) -> bool {
540        std::ptr::eq(self.c, other.c)
541    }
542}
543
544impl StreamChunkMut {
545    pub fn capacity(&self) -> usize {
546        self.vis.len()
547    }
548
549    pub fn vis(&self, i: usize) -> bool {
550        self.vis.is_set(i)
551    }
552
553    pub fn op(&self, i: usize) -> Op {
554        self.ops.get(i)
555    }
556
557    pub fn row_ref(&self, i: usize) -> RowRef<'_> {
558        RowRef::with_columns(self.columns(), i)
559    }
560
561    pub fn set_vis(&mut self, n: usize, val: bool) {
562        self.vis.set(n, val);
563    }
564
565    pub fn set_op(&mut self, n: usize, val: Op) {
566        self.ops.set(n, val);
567    }
568
569    pub fn columns(&self) -> &[ArrayRef] {
570        &self.columns
571    }
572
573    /// get the mut reference of the stream chunk.
574    pub fn to_rows_mut(&mut self) -> impl Iterator<Item = (RowRef<'_>, OpRowMutRef<'_>)> {
575        unsafe {
576            (0..self.vis.len())
577                .filter(|i| self.vis.is_set(*i))
578                .map(|i| {
579                    let p = self as *const StreamChunkMut;
580                    let p = p as *mut StreamChunkMut;
581                    (
582                        RowRef::with_columns(self.columns(), i),
583                        OpRowMutRef { c: &mut *p, i },
584                    )
585                })
586        }
587    }
588}
589
590/// Test utilities for [`StreamChunk`].
591#[easy_ext::ext(StreamChunkTestExt)]
592impl StreamChunk {
593    /// Parse a chunk from string.
594    ///
595    /// See also [`DataChunkTestExt::from_pretty`].
596    ///
597    /// # Format
598    ///
599    /// The first line is a header indicating the column types.
600    /// The following lines indicate rows within the chunk.
601    /// Each line starts with an operation followed by values.
602    /// NULL values are represented as `.`.
603    ///
604    /// # Example
605    /// ```
606    /// use risingwave_common::array::StreamChunk;
607    /// use risingwave_common::array::stream_chunk::StreamChunkTestExt as _;
608    /// let chunk = StreamChunk::from_pretty(
609    ///     "  I I I I      // type chars
610    ///     U- 2 5 . .      // '.' means NULL
611    ///     U+ 2 5 2 6 D    // 'D' means deleted in visibility
612    ///     +  . . 4 8      // ^ comments are ignored
613    ///     -  . . 3 4",
614    /// );
615    /// //  ^ operations:
616    /// //     +: Insert
617    /// //     -: Delete
618    /// //    U+: UpdateInsert
619    /// //    U-: UpdateDelete
620    ///
621    /// // type chars:
622    /// //     I: i64
623    /// //     i: i32
624    /// //     F: f64
625    /// //     f: f32
626    /// //     T: str
627    /// //    TS: Timestamp
628    /// //    TZ: Timestamptz
629    /// //   SRL: Serial
630    /// //   x[]: array of x
631    /// // <i,f>: struct
632    /// ```
633    pub fn from_pretty(s: &str) -> Self {
634        let mut chunk_str = String::new();
635        let mut ops = vec![];
636
637        let (header, body) = match s.split_once('\n') {
638            Some(pair) => pair,
639            None => {
640                // empty chunk
641                return StreamChunk {
642                    ops: Arc::new([]),
643                    data: DataChunk::from_pretty(s),
644                };
645            }
646        };
647        chunk_str.push_str(header);
648        chunk_str.push('\n');
649
650        for line in body.split_inclusive('\n') {
651            if line.trim_start().is_empty() {
652                continue;
653            }
654            let (op, row) = line
655                .trim_start()
656                .split_once(|c: char| c.is_ascii_whitespace())
657                .ok_or_else(|| panic!("missing operation: {line:?}"))
658                .unwrap();
659            ops.push(match op {
660                "+" => Op::Insert,
661                "-" => Op::Delete,
662                "U+" => Op::UpdateInsert,
663                "U-" => Op::UpdateDelete,
664                t => panic!("invalid op: {t:?}"),
665            });
666            chunk_str.push_str(row);
667        }
668        StreamChunk {
669            ops: ops.into(),
670            data: DataChunk::from_pretty(&chunk_str),
671        }
672    }
673
674    /// Validate the `StreamChunk` layout.
675    pub fn valid(&self) -> bool {
676        let len = self.ops.len();
677        let data = &self.data;
678        data.visibility().len() == len && data.columns().iter().all(|col| col.len() == len)
679    }
680
681    /// Concatenate multiple `StreamChunk` into one.
682    ///
683    /// Panics if `chunks` is empty.
684    pub fn concat(chunks: Vec<StreamChunk>) -> StreamChunk {
685        let data_types = chunks[0].data_types();
686        let size = chunks.iter().map(|c| c.cardinality()).sum::<usize>();
687
688        let mut builder = StreamChunkBuilder::unlimited(data_types, Some(size));
689
690        for chunk in chunks {
691            // TODO: directly append chunks.
692            for (op, row) in chunk.rows() {
693                let none = builder.append_row(op, row);
694                debug_assert!(none.is_none());
695            }
696        }
697
698        builder.take().expect("chunk should not be empty")
699    }
700
701    /// Sort rows.
702    pub fn sort_rows(self) -> Self {
703        if self.capacity() == 0 {
704            return self;
705        }
706        let rows = self.rows().collect_vec();
707        let mut idx = (0..self.capacity()).collect_vec();
708        idx.sort_by_key(|&i| {
709            let (op, row_ref) = rows[i];
710            (op, DefaultOrdered(row_ref))
711        });
712        StreamChunk {
713            ops: idx.iter().map(|&i| self.ops[i]).collect(),
714            data: self.data.reorder_rows(&idx),
715        }
716    }
717
718    /// Generate `num_of_chunks` data chunks with type `data_types`,
719    /// where each data chunk has cardinality of `chunk_size`.
720    /// TODO(kwannoel): Generate different types of op, different vis.
721    pub fn gen_stream_chunks(
722        num_of_chunks: usize,
723        chunk_size: usize,
724        data_types: &[DataType],
725        varchar_properties: &VarcharProperty,
726    ) -> Vec<StreamChunk> {
727        Self::gen_stream_chunks_inner(
728            num_of_chunks,
729            chunk_size,
730            data_types,
731            varchar_properties,
732            1.0,
733            1.0,
734        )
735    }
736
737    pub fn gen_stream_chunks_inner(
738        num_of_chunks: usize,
739        chunk_size: usize,
740        data_types: &[DataType],
741        varchar_properties: &VarcharProperty,
742        visibility_percent: f64, // % of rows that are visible
743        inserts_percent: f64,    // Rest will be deletes.
744    ) -> Vec<StreamChunk> {
745        let ops = if inserts_percent == 0.0 {
746            vec![Op::Delete; chunk_size]
747        } else if inserts_percent == 1.0 {
748            vec![Op::Insert; chunk_size]
749        } else {
750            let mut rng = SmallRng::from_seed([0; 32]);
751            let mut ops = vec![];
752            for _ in 0..chunk_size {
753                ops.push(if rng.random_bool(inserts_percent) {
754                    Op::Insert
755                } else {
756                    Op::Delete
757                });
758            }
759            ops
760        };
761        DataChunk::gen_data_chunks(
762            num_of_chunks,
763            chunk_size,
764            data_types,
765            varchar_properties,
766            visibility_percent,
767        )
768        .into_iter()
769        .map(|chunk| StreamChunk::from_parts(ops.clone(), chunk))
770        .collect()
771    }
772}
773
774#[cfg(test)]
775mod tests {
776    use super::*;
777
778    #[test]
779    fn test_to_pretty_string() {
780        let chunk = StreamChunk::from_pretty(
781            "  I I
782             + 1 6
783             - 2 .
784            U- 3 7
785            U+ 4 .",
786        );
787        assert_eq!(
788            chunk.to_pretty().to_string(),
789            "\
790+----+---+---+
791|  + | 1 | 6 |
792|  - | 2 |   |
793| U- | 3 | 7 |
794| U+ | 4 |   |
795+----+---+---+"
796        );
797    }
798
799    #[test]
800    fn test_split_1() {
801        let chunk = StreamChunk::from_pretty(
802            "  I I
803             + 1 6
804             - 2 .
805            U- 3 7
806            U+ 4 .",
807        );
808        let results = chunk.split(2);
809        assert_eq!(2, results.len());
810        assert_eq!(
811            results[0].to_pretty().to_string(),
812            "\
813+---+---+---+
814| + | 1 | 6 |
815| - | 2 |   |
816+---+---+---+"
817        );
818        assert_eq!(
819            results[1].to_pretty().to_string(),
820            "\
821+----+---+---+
822| U- | 3 | 7 |
823| U+ | 4 |   |
824+----+---+---+"
825        );
826    }
827
828    #[test]
829    fn test_split_2() {
830        let chunk = StreamChunk::from_pretty(
831            "  I I
832             + 1 6
833            U- 3 7
834            U+ 4 .
835             - 2 .",
836        );
837        let results = chunk.split(2);
838        assert_eq!(2, results.len());
839        assert_eq!(
840            results[0].to_pretty().to_string(),
841            "\
842+----+---+---+
843|  + | 1 | 6 |
844| U- | 3 | 7 |
845| U+ | 4 |   |
846+----+---+---+"
847        );
848        assert_eq!(
849            results[1].to_pretty().to_string(),
850            "\
851+---+---+---+
852| - | 2 |   |
853+---+---+---+"
854        );
855    }
856
857    #[test]
858    fn test_eliminate_adjacent_noop_update() {
859        let c = StreamChunk::from_pretty(
860            "  I I
861            - 1 6 D
862            - 2 2
863            + 2 3
864            - 2 3
865            + 1 6
866            - 1 7
867            + 1 10 D
868            + 1 7
869            U- 3 7
870            U+ 3 7
871            + 2 3",
872        );
873        let c = c.eliminate_adjacent_noop_update();
874        assert_eq!(
875            c.to_pretty().to_string(),
876            "\
877+---+---+---+
878| - | 2 | 2 |
879| + | 2 | 3 |
880| - | 2 | 3 |
881| + | 1 | 6 |
882| + | 2 | 3 |
883+---+---+---+"
884        );
885    }
886}