risingwave_common/array/
stream_chunk.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Display;
16use std::mem::size_of;
17use std::ops::{Deref, DerefMut};
18use std::sync::Arc;
19use std::{fmt, mem};
20
21use either::Either;
22use enum_as_inner::EnumAsInner;
23use itertools::Itertools;
24use rand::prelude::SmallRng;
25use rand::{Rng, SeedableRng};
26use risingwave_common_estimate_size::EstimateSize;
27use risingwave_pb::data::{PbOp, PbStreamChunk};
28
29use super::stream_chunk_builder::StreamChunkBuilder;
30use super::{ArrayImpl, ArrayRef, ArrayResult, DataChunkTestExt, RowRef};
31use crate::array::DataChunk;
32use crate::bitmap::{Bitmap, BitmapBuilder};
33use crate::catalog::Schema;
34use crate::field_generator::VarcharProperty;
35use crate::row::Row;
36use crate::types::{DataType, DefaultOrdered, ToText};
37
38/// `Op` represents three operations in `StreamChunk`.
39///
40/// `UpdateDelete` and `UpdateInsert` are semantically equivalent to `Delete` and `Insert`
41/// but always appear in pairs to represent an update operation.
42/// For example, table source, aggregation and outer join can generate updates by themselves,
43/// while most of the other operators only pass through updates with best effort.
44#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash, EnumAsInner)]
45pub enum Op {
46    Insert,
47    Delete,
48    UpdateDelete,
49    UpdateInsert,
50}
51
52impl Op {
53    pub fn to_protobuf(self) -> PbOp {
54        match self {
55            Op::Insert => PbOp::Insert,
56            Op::Delete => PbOp::Delete,
57            Op::UpdateInsert => PbOp::UpdateInsert,
58            Op::UpdateDelete => PbOp::UpdateDelete,
59        }
60    }
61
62    pub fn from_protobuf(prost: &i32) -> ArrayResult<Op> {
63        let op = match PbOp::try_from(*prost) {
64            Ok(PbOp::Insert) => Op::Insert,
65            Ok(PbOp::Delete) => Op::Delete,
66            Ok(PbOp::UpdateInsert) => Op::UpdateInsert,
67            Ok(PbOp::UpdateDelete) => Op::UpdateDelete,
68            Ok(PbOp::Unspecified) => unreachable!(),
69            Err(_) => bail!("No such op type"),
70        };
71        Ok(op)
72    }
73
74    /// convert `UpdateDelete` to `Delete` and `UpdateInsert` to Insert
75    pub fn normalize_update(self) -> Op {
76        match self {
77            Op::Insert => Op::Insert,
78            Op::Delete => Op::Delete,
79            Op::UpdateDelete => Op::Delete,
80            Op::UpdateInsert => Op::Insert,
81        }
82    }
83
84    pub fn to_i16(self) -> i16 {
85        match self {
86            Op::Insert => 1,
87            Op::Delete => 2,
88            Op::UpdateInsert => 3,
89            Op::UpdateDelete => 4,
90        }
91    }
92
93    pub fn to_varchar(self) -> String {
94        match self {
95            Op::Insert => "Insert",
96            Op::Delete => "Delete",
97            Op::UpdateInsert => "UpdateInsert",
98            Op::UpdateDelete => "UpdateDelete",
99        }
100        .to_owned()
101    }
102}
103
104/// `StreamChunk` is used to pass data over the streaming pathway.
105#[derive(Clone, PartialEq)]
106pub struct StreamChunk {
107    // TODO: Optimize using bitmap
108    ops: Arc<[Op]>,
109    data: DataChunk,
110}
111
112impl Default for StreamChunk {
113    /// Create a 0-row-0-col `StreamChunk`. Only used in some existing tests.
114    /// This is NOT the same as an **empty** chunk, which has 0 rows but with
115    /// columns aligned with executor schema.
116    fn default() -> Self {
117        Self {
118            ops: Arc::new([]),
119            data: DataChunk::new(vec![], 0),
120        }
121    }
122}
123
124impl StreamChunk {
125    /// Create a new `StreamChunk` with given ops and columns.
126    pub fn new(ops: impl Into<Arc<[Op]>>, columns: Vec<ArrayRef>) -> Self {
127        let ops = ops.into();
128        let visibility = Bitmap::ones(ops.len());
129        Self::with_visibility(ops, columns, visibility)
130    }
131
132    /// Create a new `StreamChunk` with given ops, columns and visibility.
133    pub fn with_visibility(
134        ops: impl Into<Arc<[Op]>>,
135        columns: Vec<ArrayRef>,
136        visibility: Bitmap,
137    ) -> Self {
138        let ops = ops.into();
139        for col in &columns {
140            assert_eq!(col.len(), ops.len());
141        }
142        let data = DataChunk::new(columns, visibility);
143        StreamChunk { ops, data }
144    }
145
146    /// Build a `StreamChunk` from rows.
147    ///
148    /// Panics if the `rows` is empty.
149    ///
150    /// Should prefer using [`StreamChunkBuilder`] instead to avoid unnecessary
151    /// allocation of rows.
152    pub fn from_rows(rows: &[(Op, impl Row)], data_types: &[DataType]) -> Self {
153        let mut builder = StreamChunkBuilder::unlimited(data_types.to_vec(), Some(rows.len()));
154
155        for (op, row) in rows {
156            let none = builder.append_row(*op, row);
157            debug_assert!(none.is_none());
158        }
159
160        builder.take().expect("chunk should not be empty")
161    }
162
163    /// Get the reference of the underlying data chunk.
164    pub fn data_chunk(&self) -> &DataChunk {
165        &self.data
166    }
167
168    /// compact the `StreamChunk` with its visibility map
169    pub fn compact(self) -> Self {
170        if self.is_compacted() {
171            return self;
172        }
173
174        let (ops, columns, visibility) = self.into_inner();
175
176        let cardinality = visibility
177            .iter()
178            .fold(0, |vis_cnt, vis| vis_cnt + vis as usize);
179        let columns: Vec<_> = columns
180            .into_iter()
181            .map(|col| col.compact(&visibility, cardinality).into())
182            .collect();
183        let mut new_ops = Vec::with_capacity(cardinality);
184        for idx in visibility.iter_ones() {
185            new_ops.push(ops[idx]);
186        }
187        StreamChunk::new(new_ops, columns)
188    }
189
190    /// Split the `StreamChunk` into multiple chunks with the given size at most.
191    ///
192    /// When the total cardinality of all the chunks is not evenly divided by the `size`,
193    /// the last new chunk will be the remainder.
194    ///
195    /// For consecutive `UpdateDelete` and `UpdateInsert`, they will be kept in one chunk.
196    /// As a result, some chunks may have `size + 1` rows.
197    pub fn split(&self, size: usize) -> Vec<Self> {
198        let mut builder = StreamChunkBuilder::new(size, self.data_types());
199        let mut outputs = Vec::new();
200
201        // TODO: directly append the chunk.
202        for (op, row) in self.rows() {
203            if let Some(chunk) = builder.append_row(op, row) {
204                outputs.push(chunk);
205            }
206        }
207        if let Some(output) = builder.take() {
208            outputs.push(output);
209        }
210
211        outputs
212    }
213
214    pub fn into_parts(self) -> (DataChunk, Arc<[Op]>) {
215        (self.data, self.ops)
216    }
217
218    pub fn from_parts(ops: impl Into<Arc<[Op]>>, data_chunk: DataChunk) -> Self {
219        let (columns, vis) = data_chunk.into_parts();
220        Self::with_visibility(ops, columns, vis)
221    }
222
223    pub fn into_inner(self) -> (Arc<[Op]>, Vec<ArrayRef>, Bitmap) {
224        let (columns, vis) = self.data.into_parts();
225        (self.ops, columns, vis)
226    }
227
228    pub fn to_protobuf(&self) -> PbStreamChunk {
229        if !self.is_compacted() {
230            return self.clone().compact().to_protobuf();
231        }
232        PbStreamChunk {
233            cardinality: self.cardinality() as u32,
234            ops: self.ops.iter().map(|op| op.to_protobuf() as i32).collect(),
235            columns: self.columns().iter().map(|col| col.to_protobuf()).collect(),
236        }
237    }
238
239    pub fn from_protobuf(prost: &PbStreamChunk) -> ArrayResult<Self> {
240        let cardinality = prost.get_cardinality() as usize;
241        let mut ops = Vec::with_capacity(cardinality);
242        for op in prost.get_ops() {
243            ops.push(Op::from_protobuf(op)?);
244        }
245        let mut columns = vec![];
246        for column in prost.get_columns() {
247            columns.push(ArrayImpl::from_protobuf(column, cardinality)?.into());
248        }
249        Ok(StreamChunk::new(ops, columns))
250    }
251
252    pub fn ops(&self) -> &[Op] {
253        &self.ops
254    }
255
256    /// Returns a table-like text representation of the `StreamChunk`.
257    pub fn to_pretty(&self) -> impl Display + use<> {
258        self.to_pretty_inner(None)
259    }
260
261    /// Returns a table-like text representation of the `StreamChunk` with a header of column names
262    /// from the given `schema`.
263    pub fn to_pretty_with_schema(&self, schema: &Schema) -> impl Display + use<> {
264        self.to_pretty_inner(Some(schema))
265    }
266
267    fn to_pretty_inner(&self, schema: Option<&Schema>) -> impl Display + use<> {
268        use comfy_table::{Cell, CellAlignment, Table};
269
270        if self.cardinality() == 0 {
271            return Either::Left("(empty)");
272        }
273
274        let mut table = Table::new();
275        table.load_preset(DataChunk::PRETTY_TABLE_PRESET);
276
277        if let Some(schema) = schema {
278            assert_eq!(self.dimension(), schema.len());
279            let cells = std::iter::once(String::new())
280                .chain(schema.fields().iter().map(|f| f.name.clone()));
281            table.set_header(cells);
282        }
283
284        for (op, row_ref) in self.rows() {
285            let mut cells = Vec::with_capacity(row_ref.len() + 1);
286            cells.push(
287                Cell::new(match op {
288                    Op::Insert => "+",
289                    Op::Delete => "-",
290                    Op::UpdateDelete => "U-",
291                    Op::UpdateInsert => "U+",
292                })
293                .set_alignment(CellAlignment::Right),
294            );
295            for datum in row_ref.iter() {
296                let str = match datum {
297                    None => "".to_owned(), // NULL
298                    Some(scalar) => scalar.to_text(),
299                };
300                cells.push(Cell::new(str));
301            }
302            table.add_row(cells);
303        }
304
305        Either::Right(table)
306    }
307
308    /// Reorder (and possibly remove) columns.
309    ///
310    /// e.g. if `indices` is `[2, 1, 0]`, and the chunk contains column `[a, b, c]`, then the output
311    /// will be `[c, b, a]`. If `indices` is [2, 0], then the output will be `[c, a]`.
312    /// If the input mapping is identity mapping, no reorder will be performed.
313    pub fn project(&self, indices: &[usize]) -> Self {
314        Self {
315            ops: self.ops.clone(),
316            data: self.data.project(indices),
317        }
318    }
319
320    /// Remove the adjacent delete-insert if their row value are the same.
321    pub fn eliminate_adjacent_noop_update(self) -> Self {
322        let len = self.data_chunk().capacity();
323        let mut c: StreamChunkMut = self.into();
324        let mut prev_r = None;
325        for curr in 0..len {
326            if !c.vis(curr) {
327                continue;
328            }
329            if let Some(prev) = prev_r {
330                if matches!(c.op(prev), Op::UpdateDelete | Op::Delete)
331                    && matches!(c.op(curr), Op::UpdateInsert | Op::Insert)
332                    && c.row_ref(prev) == c.row_ref(curr)
333                {
334                    c.set_vis(prev, false);
335                    c.set_vis(curr, false);
336                    prev_r = None;
337                } else {
338                    prev_r = Some(curr)
339                }
340            } else {
341                prev_r = Some(curr);
342            }
343        }
344        c.into()
345    }
346
347    /// Reorder columns and set visibility.
348    pub fn project_with_vis(&self, indices: &[usize], vis: Bitmap) -> Self {
349        Self {
350            ops: self.ops.clone(),
351            data: self.data.project_with_vis(indices, vis),
352        }
353    }
354
355    /// Clone the `StreamChunk` with a new visibility.
356    pub fn clone_with_vis(&self, vis: Bitmap) -> Self {
357        Self {
358            ops: self.ops.clone(),
359            data: self.data.with_visibility(vis),
360        }
361    }
362
363    // Compute the required permits of this chunk for rate limiting.
364    pub fn compute_rate_limit_chunk_permits(&self) -> u64 {
365        self.capacity() as _
366    }
367}
368
369impl Deref for StreamChunk {
370    type Target = DataChunk;
371
372    fn deref(&self) -> &Self::Target {
373        &self.data
374    }
375}
376
377impl DerefMut for StreamChunk {
378    fn deref_mut(&mut self) -> &mut Self::Target {
379        &mut self.data
380    }
381}
382
383/// `StreamChunk` can be created from `DataChunk` with all operations set to `Insert`.
384impl From<DataChunk> for StreamChunk {
385    fn from(data: DataChunk) -> Self {
386        Self::from_parts(vec![Op::Insert; data.capacity()], data)
387    }
388}
389
390impl fmt::Debug for StreamChunk {
391    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
392        if f.alternate() {
393            write!(
394                f,
395                "StreamChunk {{ cardinality: {}, capacity: {}, data:\n{}\n }}",
396                self.cardinality(),
397                self.capacity(),
398                self.to_pretty()
399            )
400        } else {
401            f.debug_struct("StreamChunk")
402                .field("cardinality", &self.cardinality())
403                .field("capacity", &self.capacity())
404                .finish_non_exhaustive()
405        }
406    }
407}
408
409impl EstimateSize for StreamChunk {
410    fn estimated_heap_size(&self) -> usize {
411        self.data.estimated_heap_size() + self.ops.len() * size_of::<Op>()
412    }
413}
414
415enum OpsMutState {
416    ArcRef(Arc<[Op]>),
417    Mut(Vec<Op>),
418}
419
420impl OpsMutState {
421    const UNDEFINED: Self = Self::Mut(Vec::new());
422}
423
424pub struct OpsMut {
425    state: OpsMutState,
426}
427
428impl OpsMut {
429    pub fn new(ops: Arc<[Op]>) -> Self {
430        Self {
431            state: OpsMutState::ArcRef(ops),
432        }
433    }
434
435    pub fn len(&self) -> usize {
436        match &self.state {
437            OpsMutState::ArcRef(v) => v.len(),
438            OpsMutState::Mut(v) => v.len(),
439        }
440    }
441
442    pub fn is_empty(&self) -> bool {
443        self.len() == 0
444    }
445
446    pub fn set(&mut self, n: usize, val: Op) {
447        debug_assert!(n < self.len());
448        if let OpsMutState::Mut(v) = &mut self.state {
449            v[n] = val;
450        } else {
451            let state = mem::replace(&mut self.state, OpsMutState::UNDEFINED); // intermediate state
452            let mut v = match state {
453                OpsMutState::ArcRef(v) => v.to_vec(),
454                OpsMutState::Mut(_) => unreachable!(),
455            };
456            v[n] = val;
457            self.state = OpsMutState::Mut(v);
458        }
459    }
460
461    pub fn get(&self, n: usize) -> Op {
462        debug_assert!(n < self.len());
463        match &self.state {
464            OpsMutState::ArcRef(v) => v[n],
465            OpsMutState::Mut(v) => v[n],
466        }
467    }
468}
469impl From<OpsMut> for Arc<[Op]> {
470    fn from(v: OpsMut) -> Self {
471        match v.state {
472            OpsMutState::ArcRef(a) => a,
473            OpsMutState::Mut(v) => v.into(),
474        }
475    }
476}
477
478/// A mutable wrapper for `StreamChunk`. can only set the visibilities and ops in place, can not
479/// change the length.
480pub struct StreamChunkMut {
481    columns: Arc<[ArrayRef]>,
482    ops: OpsMut,
483    vis: BitmapBuilder,
484}
485
486impl From<StreamChunk> for StreamChunkMut {
487    fn from(c: StreamChunk) -> Self {
488        let (c, ops) = c.into_parts();
489        let (columns, vis) = c.into_parts_v2();
490        Self {
491            columns,
492            ops: OpsMut::new(ops),
493            vis: vis.into(),
494        }
495    }
496}
497
498impl From<StreamChunkMut> for StreamChunk {
499    fn from(c: StreamChunkMut) -> Self {
500        StreamChunk::from_parts(c.ops, DataChunk::from_parts(c.columns, c.vis.finish()))
501    }
502}
503
504pub struct OpRowMutRef<'a> {
505    c: &'a mut StreamChunkMut,
506    i: usize,
507}
508
509impl OpRowMutRef<'_> {
510    pub fn index(&self) -> usize {
511        self.i
512    }
513
514    pub fn vis(&self) -> bool {
515        self.c.vis.is_set(self.i)
516    }
517
518    pub fn op(&self) -> Op {
519        self.c.ops.get(self.i)
520    }
521
522    pub fn set_vis(&mut self, val: bool) {
523        self.c.set_vis(self.i, val);
524    }
525
526    pub fn set_op(&mut self, val: Op) {
527        self.c.set_op(self.i, val);
528    }
529
530    pub fn row_ref(&self) -> RowRef<'_> {
531        RowRef::with_columns(self.c.columns(), self.i)
532    }
533
534    /// return if the two row ref is in the same chunk
535    pub fn same_chunk(&self, other: &Self) -> bool {
536        std::ptr::eq(self.c, other.c)
537    }
538}
539
540impl StreamChunkMut {
541    pub fn capacity(&self) -> usize {
542        self.vis.len()
543    }
544
545    pub fn vis(&self, i: usize) -> bool {
546        self.vis.is_set(i)
547    }
548
549    pub fn op(&self, i: usize) -> Op {
550        self.ops.get(i)
551    }
552
553    pub fn row_ref(&self, i: usize) -> RowRef<'_> {
554        RowRef::with_columns(self.columns(), i)
555    }
556
557    pub fn set_vis(&mut self, n: usize, val: bool) {
558        self.vis.set(n, val);
559    }
560
561    pub fn set_op(&mut self, n: usize, val: Op) {
562        self.ops.set(n, val);
563    }
564
565    pub fn columns(&self) -> &[ArrayRef] {
566        &self.columns
567    }
568
569    /// get the mut reference of the stream chunk.
570    pub fn to_rows_mut(&mut self) -> impl Iterator<Item = (RowRef<'_>, OpRowMutRef<'_>)> {
571        unsafe {
572            (0..self.vis.len())
573                .filter(|i| self.vis.is_set(*i))
574                .map(|i| {
575                    let p = self as *const StreamChunkMut;
576                    let p = p as *mut StreamChunkMut;
577                    (
578                        RowRef::with_columns(self.columns(), i),
579                        OpRowMutRef { c: &mut *p, i },
580                    )
581                })
582        }
583    }
584}
585
586/// Test utilities for [`StreamChunk`].
587#[easy_ext::ext(StreamChunkTestExt)]
588impl StreamChunk {
589    /// Parse a chunk from string.
590    ///
591    /// See also [`DataChunkTestExt::from_pretty`].
592    ///
593    /// # Format
594    ///
595    /// The first line is a header indicating the column types.
596    /// The following lines indicate rows within the chunk.
597    /// Each line starts with an operation followed by values.
598    /// NULL values are represented as `.`.
599    ///
600    /// # Example
601    /// ```
602    /// use risingwave_common::array::StreamChunk;
603    /// use risingwave_common::array::stream_chunk::StreamChunkTestExt as _;
604    /// let chunk = StreamChunk::from_pretty(
605    ///     "  I I I I      // type chars
606    ///     U- 2 5 . .      // '.' means NULL
607    ///     U+ 2 5 2 6 D    // 'D' means deleted in visibility
608    ///     +  . . 4 8      // ^ comments are ignored
609    ///     -  . . 3 4",
610    /// );
611    /// //  ^ operations:
612    /// //     +: Insert
613    /// //     -: Delete
614    /// //    U+: UpdateInsert
615    /// //    U-: UpdateDelete
616    ///
617    /// // type chars:
618    /// //     I: i64
619    /// //     i: i32
620    /// //     F: f64
621    /// //     f: f32
622    /// //     T: str
623    /// //    TS: Timestamp
624    /// //    TZ: Timestamptz
625    /// //   SRL: Serial
626    /// //   x[]: array of x
627    /// // <i,f>: struct
628    /// ```
629    pub fn from_pretty(s: &str) -> Self {
630        let mut chunk_str = String::new();
631        let mut ops = vec![];
632
633        let (header, body) = match s.split_once('\n') {
634            Some(pair) => pair,
635            None => {
636                // empty chunk
637                return StreamChunk {
638                    ops: Arc::new([]),
639                    data: DataChunk::from_pretty(s),
640                };
641            }
642        };
643        chunk_str.push_str(header);
644        chunk_str.push('\n');
645
646        for line in body.split_inclusive('\n') {
647            if line.trim_start().is_empty() {
648                continue;
649            }
650            let (op, row) = line
651                .trim_start()
652                .split_once(|c: char| c.is_ascii_whitespace())
653                .ok_or_else(|| panic!("missing operation: {line:?}"))
654                .unwrap();
655            ops.push(match op {
656                "+" => Op::Insert,
657                "-" => Op::Delete,
658                "U+" => Op::UpdateInsert,
659                "U-" => Op::UpdateDelete,
660                t => panic!("invalid op: {t:?}"),
661            });
662            chunk_str.push_str(row);
663        }
664        StreamChunk {
665            ops: ops.into(),
666            data: DataChunk::from_pretty(&chunk_str),
667        }
668    }
669
670    /// Validate the `StreamChunk` layout.
671    pub fn valid(&self) -> bool {
672        let len = self.ops.len();
673        let data = &self.data;
674        data.visibility().len() == len && data.columns().iter().all(|col| col.len() == len)
675    }
676
677    /// Concatenate multiple `StreamChunk` into one.
678    ///
679    /// Panics if `chunks` is empty.
680    pub fn concat(chunks: Vec<StreamChunk>) -> StreamChunk {
681        let data_types = chunks[0].data_types();
682        let size = chunks.iter().map(|c| c.cardinality()).sum::<usize>();
683
684        let mut builder = StreamChunkBuilder::unlimited(data_types, Some(size));
685
686        for chunk in chunks {
687            // TODO: directly append chunks.
688            for (op, row) in chunk.rows() {
689                let none = builder.append_row(op, row);
690                debug_assert!(none.is_none());
691            }
692        }
693
694        builder.take().expect("chunk should not be empty")
695    }
696
697    /// Sort rows.
698    pub fn sort_rows(self) -> Self {
699        if self.capacity() == 0 {
700            return self;
701        }
702        let rows = self.rows().collect_vec();
703        let mut idx = (0..self.capacity()).collect_vec();
704        idx.sort_by_key(|&i| {
705            let (op, row_ref) = rows[i];
706            (op, DefaultOrdered(row_ref))
707        });
708        StreamChunk {
709            ops: idx.iter().map(|&i| self.ops[i]).collect(),
710            data: self.data.reorder_rows(&idx),
711        }
712    }
713
714    /// Generate `num_of_chunks` data chunks with type `data_types`,
715    /// where each data chunk has cardinality of `chunk_size`.
716    /// TODO(kwannoel): Generate different types of op, different vis.
717    pub fn gen_stream_chunks(
718        num_of_chunks: usize,
719        chunk_size: usize,
720        data_types: &[DataType],
721        varchar_properties: &VarcharProperty,
722    ) -> Vec<StreamChunk> {
723        Self::gen_stream_chunks_inner(
724            num_of_chunks,
725            chunk_size,
726            data_types,
727            varchar_properties,
728            1.0,
729            1.0,
730        )
731    }
732
733    pub fn gen_stream_chunks_inner(
734        num_of_chunks: usize,
735        chunk_size: usize,
736        data_types: &[DataType],
737        varchar_properties: &VarcharProperty,
738        visibility_percent: f64, // % of rows that are visible
739        inserts_percent: f64,    // Rest will be deletes.
740    ) -> Vec<StreamChunk> {
741        let ops = if inserts_percent == 0.0 {
742            vec![Op::Delete; chunk_size]
743        } else if inserts_percent == 1.0 {
744            vec![Op::Insert; chunk_size]
745        } else {
746            let mut rng = SmallRng::from_seed([0; 32]);
747            let mut ops = vec![];
748            for _ in 0..chunk_size {
749                ops.push(if rng.random_bool(inserts_percent) {
750                    Op::Insert
751                } else {
752                    Op::Delete
753                });
754            }
755            ops
756        };
757        DataChunk::gen_data_chunks(
758            num_of_chunks,
759            chunk_size,
760            data_types,
761            varchar_properties,
762            visibility_percent,
763        )
764        .into_iter()
765        .map(|chunk| StreamChunk::from_parts(ops.clone(), chunk))
766        .collect()
767    }
768}
769
770#[cfg(test)]
771mod tests {
772    use super::*;
773
774    #[test]
775    fn test_to_pretty_string() {
776        let chunk = StreamChunk::from_pretty(
777            "  I I
778             + 1 6
779             - 2 .
780            U- 3 7
781            U+ 4 .",
782        );
783        assert_eq!(
784            chunk.to_pretty().to_string(),
785            "\
786+----+---+---+
787|  + | 1 | 6 |
788|  - | 2 |   |
789| U- | 3 | 7 |
790| U+ | 4 |   |
791+----+---+---+"
792        );
793    }
794
795    #[test]
796    fn test_split_1() {
797        let chunk = StreamChunk::from_pretty(
798            "  I I
799             + 1 6
800             - 2 .
801            U- 3 7
802            U+ 4 .",
803        );
804        let results = chunk.split(2);
805        assert_eq!(2, results.len());
806        assert_eq!(
807            results[0].to_pretty().to_string(),
808            "\
809+---+---+---+
810| + | 1 | 6 |
811| - | 2 |   |
812+---+---+---+"
813        );
814        assert_eq!(
815            results[1].to_pretty().to_string(),
816            "\
817+----+---+---+
818| U- | 3 | 7 |
819| U+ | 4 |   |
820+----+---+---+"
821        );
822    }
823
824    #[test]
825    fn test_split_2() {
826        let chunk = StreamChunk::from_pretty(
827            "  I I
828             + 1 6
829            U- 3 7
830            U+ 4 .
831             - 2 .",
832        );
833        let results = chunk.split(2);
834        assert_eq!(2, results.len());
835        assert_eq!(
836            results[0].to_pretty().to_string(),
837            "\
838+----+---+---+
839|  + | 1 | 6 |
840| U- | 3 | 7 |
841| U+ | 4 |   |
842+----+---+---+"
843        );
844        assert_eq!(
845            results[1].to_pretty().to_string(),
846            "\
847+---+---+---+
848| - | 2 |   |
849+---+---+---+"
850        );
851    }
852
853    #[test]
854    fn test_eliminate_adjacent_noop_update() {
855        let c = StreamChunk::from_pretty(
856            "  I I
857            - 1 6 D
858            - 2 2
859            + 2 3
860            - 2 3
861            + 1 6
862            - 1 7
863            + 1 10 D
864            + 1 7
865            U- 3 7
866            U+ 3 7
867            + 2 3",
868        );
869        let c = c.eliminate_adjacent_noop_update();
870        assert_eq!(
871            c.to_pretty().to_string(),
872            "\
873+---+---+---+
874| - | 2 | 2 |
875| + | 2 | 3 |
876| - | 2 | 3 |
877| + | 1 | 6 |
878| + | 2 | 3 |
879+---+---+---+"
880        );
881    }
882}