risingwave_common/array/
stream_chunk.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Display;
16use std::mem::size_of;
17use std::ops::{Deref, DerefMut};
18use std::sync::{Arc, LazyLock};
19use std::{fmt, mem};
20
21use either::Either;
22use enum_as_inner::EnumAsInner;
23use itertools::Itertools;
24use rand::prelude::SmallRng;
25use rand::{Rng, SeedableRng};
26use risingwave_common_estimate_size::EstimateSize;
27use risingwave_pb::data::{PbOp, PbStreamChunk};
28
29use super::stream_chunk_builder::StreamChunkBuilder;
30use super::{ArrayImpl, ArrayRef, ArrayResult, DataChunkTestExt, RowRef};
31use crate::array::DataChunk;
32use crate::bitmap::{Bitmap, BitmapBuilder};
33use crate::catalog::Schema;
34use crate::field_generator::VarcharProperty;
35use crate::row::Row;
36use crate::types::{DataType, DefaultOrdered, ToText};
37
38/// `Op` represents three operations in `StreamChunk`.
39///
40/// `UpdateDelete` and `UpdateInsert` are semantically equivalent to `Delete` and `Insert`
41/// but always appear in pairs to represent an update operation. It's guaranteed that
42/// they are adjacent to each other in the same `StreamChunk`, and the stream key of the two
43/// rows are the same.
44#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash, EnumAsInner)]
45pub enum Op {
46    Insert,
47    Delete,
48    UpdateDelete,
49    UpdateInsert,
50}
51
52impl Op {
53    pub fn to_protobuf(self) -> PbOp {
54        match self {
55            Op::Insert => PbOp::Insert,
56            Op::Delete => PbOp::Delete,
57            Op::UpdateInsert => PbOp::UpdateInsert,
58            Op::UpdateDelete => PbOp::UpdateDelete,
59        }
60    }
61
62    pub fn from_protobuf(prost: &i32) -> ArrayResult<Op> {
63        let op = match PbOp::try_from(*prost) {
64            Ok(PbOp::Insert) => Op::Insert,
65            Ok(PbOp::Delete) => Op::Delete,
66            Ok(PbOp::UpdateInsert) => Op::UpdateInsert,
67            Ok(PbOp::UpdateDelete) => Op::UpdateDelete,
68            Ok(PbOp::Unspecified) => unreachable!(),
69            Err(_) => bail!("No such op type"),
70        };
71        Ok(op)
72    }
73
74    /// convert `UpdateDelete` to `Delete` and `UpdateInsert` to Insert
75    pub fn normalize_update(self) -> Op {
76        match self {
77            Op::Insert => Op::Insert,
78            Op::Delete => Op::Delete,
79            Op::UpdateDelete => Op::Delete,
80            Op::UpdateInsert => Op::Insert,
81        }
82    }
83
84    pub fn to_i16(self) -> i16 {
85        match self {
86            Op::Insert => 1,
87            Op::Delete => 2,
88            Op::UpdateInsert => 3,
89            Op::UpdateDelete => 4,
90        }
91    }
92
93    pub fn to_varchar(self) -> String {
94        match self {
95            Op::Insert => "Insert",
96            Op::Delete => "Delete",
97            Op::UpdateInsert => "UpdateInsert",
98            Op::UpdateDelete => "UpdateDelete",
99        }
100        .to_owned()
101    }
102}
103
104/// `StreamChunk` is used to pass data over the streaming pathway.
105#[derive(Clone, PartialEq)]
106pub struct StreamChunk {
107    // TODO: Optimize using bitmap
108    ops: Arc<[Op]>,
109    data: DataChunk,
110}
111
112impl Default for StreamChunk {
113    /// Create a 0-row-0-col `StreamChunk`. Only used in some existing tests.
114    /// This is NOT the same as an **empty** chunk, which has 0 rows but with
115    /// columns aligned with executor schema.
116    fn default() -> Self {
117        Self {
118            ops: Arc::new([]),
119            data: DataChunk::new(vec![], 0),
120        }
121    }
122}
123
124impl StreamChunk {
125    /// Create a new `StreamChunk` with given ops and columns.
126    pub fn new(ops: impl Into<Arc<[Op]>>, columns: Vec<ArrayRef>) -> Self {
127        let ops = ops.into();
128        let visibility = Bitmap::ones(ops.len());
129        Self::with_visibility(ops, columns, visibility)
130    }
131
132    /// Create a new `StreamChunk` with given ops, columns and visibility.
133    pub fn with_visibility(
134        ops: impl Into<Arc<[Op]>>,
135        columns: Vec<ArrayRef>,
136        visibility: Bitmap,
137    ) -> Self {
138        let ops = ops.into();
139        for col in &columns {
140            assert_eq!(col.len(), ops.len());
141        }
142        let data = DataChunk::new(columns, visibility);
143        StreamChunk { ops, data }
144    }
145
146    /// Build a `StreamChunk` from rows.
147    ///
148    /// Panics if the `rows` is empty.
149    ///
150    /// Should prefer using [`StreamChunkBuilder`] instead to avoid unnecessary
151    /// allocation of rows.
152    pub fn from_rows(rows: &[(Op, impl Row)], data_types: &[DataType]) -> Self {
153        let mut builder = StreamChunkBuilder::unlimited(data_types.to_vec(), Some(rows.len()));
154
155        for (op, row) in rows {
156            let none = builder.append_row(*op, row);
157            debug_assert!(none.is_none());
158        }
159
160        builder.take().expect("chunk should not be empty")
161    }
162
163    pub fn empty(data_types: &[DataType]) -> Self {
164        StreamChunkBuilder::build_empty(data_types.to_vec())
165    }
166
167    /// Get the reference of the underlying data chunk.
168    pub fn data_chunk(&self) -> &DataChunk {
169        &self.data
170    }
171
172    /// Removes the invisible rows based on `visibility`. Returns a new compacted chunk
173    /// with all rows visible.
174    ///
175    /// This does not change the visible content of the chunk. Not to be confused with
176    /// `StreamChunkCompactor`, which removes unnecessary changes based on the key.
177    ///
178    /// See [`DataChunk::compact_vis`] for more details.
179    pub fn compact_vis(self) -> Self {
180        if self.is_vis_compacted() {
181            return self;
182        }
183
184        let (ops, columns, visibility) = self.into_inner();
185
186        let cardinality = visibility
187            .iter()
188            .fold(0, |vis_cnt, vis| vis_cnt + vis as usize);
189        let columns: Vec<_> = columns
190            .into_iter()
191            .map(|col| col.compact_vis(&visibility, cardinality).into())
192            .collect();
193        let mut new_ops = Vec::with_capacity(cardinality);
194        for idx in visibility.iter_ones() {
195            new_ops.push(ops[idx]);
196        }
197        StreamChunk::new(new_ops, columns)
198    }
199
200    /// Split the `StreamChunk` into multiple chunks with the given size at most.
201    ///
202    /// When the total cardinality of all the chunks is not evenly divided by the `size`,
203    /// the last new chunk will be the remainder.
204    ///
205    /// For consecutive `UpdateDelete` and `UpdateInsert`, they will be kept in one chunk.
206    /// As a result, some chunks may have `size + 1` rows.
207    pub fn split(&self, size: usize) -> Vec<Self> {
208        let mut builder = StreamChunkBuilder::new(size, self.data_types());
209        let mut outputs = Vec::new();
210
211        // TODO: directly append the chunk.
212        for (op, row) in self.rows() {
213            if let Some(chunk) = builder.append_row(op, row) {
214                outputs.push(chunk);
215            }
216        }
217        if let Some(output) = builder.take() {
218            outputs.push(output);
219        }
220
221        outputs
222    }
223
224    pub fn into_parts(self) -> (DataChunk, Arc<[Op]>) {
225        (self.data, self.ops)
226    }
227
228    pub fn from_parts(ops: impl Into<Arc<[Op]>>, data_chunk: DataChunk) -> Self {
229        let (columns, vis) = data_chunk.into_parts();
230        Self::with_visibility(ops, columns, vis)
231    }
232
233    pub fn into_inner(self) -> (Arc<[Op]>, Vec<ArrayRef>, Bitmap) {
234        let (columns, vis) = self.data.into_parts();
235        (self.ops, columns, vis)
236    }
237
238    pub fn to_protobuf(&self) -> PbStreamChunk {
239        if !self.is_vis_compacted() {
240            return self.clone().compact_vis().to_protobuf();
241        }
242        PbStreamChunk {
243            cardinality: self.cardinality() as u32,
244            ops: self.ops.iter().map(|op| op.to_protobuf() as i32).collect(),
245            columns: self.columns().iter().map(|col| col.to_protobuf()).collect(),
246        }
247    }
248
249    pub fn from_protobuf(prost: &PbStreamChunk) -> ArrayResult<Self> {
250        let cardinality = prost.get_cardinality() as usize;
251        let mut ops = Vec::with_capacity(cardinality);
252        for op in prost.get_ops() {
253            ops.push(Op::from_protobuf(op)?);
254        }
255        let mut columns = vec![];
256        for column in prost.get_columns() {
257            columns.push(ArrayImpl::from_protobuf(column, cardinality)?.into());
258        }
259        Ok(StreamChunk::new(ops, columns))
260    }
261
262    pub fn ops(&self) -> &[Op] {
263        &self.ops
264    }
265
266    /// Returns a table-like text representation of the `StreamChunk`.
267    pub fn to_pretty(&self) -> impl Display + use<> {
268        self.to_pretty_inner(None)
269    }
270
271    /// Returns a table-like text representation of the `StreamChunk` with a header of column names
272    /// from the given `schema`.
273    pub fn to_pretty_with_schema(&self, schema: &Schema) -> impl Display + use<> {
274        self.to_pretty_inner(Some(schema))
275    }
276
277    fn to_pretty_inner(&self, schema: Option<&Schema>) -> impl Display + use<> {
278        use comfy_table::{Cell, CellAlignment, Table};
279
280        if self.cardinality() == 0 {
281            return Either::Left("(empty)");
282        }
283
284        let mut table = Table::new();
285        table.load_preset(DataChunk::PRETTY_TABLE_PRESET);
286
287        if let Some(schema) = schema {
288            assert_eq!(self.dimension(), schema.len());
289            let cells = std::iter::once(String::new())
290                .chain(schema.fields().iter().map(|f| f.name.clone()));
291            table.set_header(cells);
292        }
293
294        for (op, row_ref) in self.rows() {
295            let mut cells = Vec::with_capacity(row_ref.len() + 1);
296            cells.push(
297                Cell::new(match op {
298                    Op::Insert => "+",
299                    Op::Delete => "-",
300                    Op::UpdateDelete => "U-",
301                    Op::UpdateInsert => "U+",
302                })
303                .set_alignment(CellAlignment::Right),
304            );
305            for datum in row_ref.iter() {
306                let str = match datum {
307                    None => "".to_owned(), // NULL
308                    Some(scalar) => scalar.to_text(),
309                };
310                cells.push(Cell::new(str));
311            }
312            table.add_row(cells);
313        }
314
315        Either::Right(table)
316    }
317
318    /// Reorder (and possibly remove) columns.
319    ///
320    /// e.g. if `indices` is `[2, 1, 0]`, and the chunk contains column `[a, b, c]`, then the output
321    /// will be `[c, b, a]`. If `indices` is [2, 0], then the output will be `[c, a]`.
322    /// If the input mapping is identity mapping, no reorder will be performed.
323    pub fn project(&self, indices: &[usize]) -> Self {
324        Self {
325            ops: self.ops.clone(),
326            data: self.data.project(indices),
327        }
328    }
329
330    /// Remove the adjacent delete-insert and insert-deletes if their row value are the same.
331    pub fn eliminate_adjacent_noop_update(self) -> Self {
332        let len = self.data_chunk().capacity();
333        let mut c: StreamChunkMut = self.into();
334        let mut prev_r = None;
335        for curr in 0..len {
336            if !c.vis(curr) {
337                continue;
338            }
339            if let Some(prev) = prev_r
340                && (
341                    // 1. Delete then Insert
342                    (matches!(c.op(prev), Op::UpdateDelete | Op::Delete)
343                    && matches!(c.op(curr), Op::UpdateInsert | Op::Insert))
344                    ||
345                    // 2. Insert then Delete
346                    //
347                    // Note that after eliminating `U+` and `U-` here, we will get a new
348                    // pair of `U-` and `U+` consisting of `prev.prev` and the `next` row.
349                    // `prev.prev` and `prev`, `curr` and `next` share the same stream key
350                    // because they are `U-` and `U+` pairs, while `prev` and `curr` share
351                    // the same stream key because they are equal, so `prev-prev` and `next`
352                    // must also share the same stream key. Therefore, it's okay to leave
353                    // their `Update` ops unchanged.
354                    //
355                    // Also, they can't be the same, otherwise these 4 rows are the same,
356                    // which should already be eliminated by the first branch.
357                    (matches!(c.op(prev), Op::UpdateInsert | Op::Insert)
358                    && matches!(c.op(curr), Op::UpdateDelete | Op::Delete))
359                )
360                && c.row_ref(prev) == c.row_ref(curr)
361            {
362                c.set_vis(prev, false);
363                c.set_vis(curr, false);
364                prev_r = None;
365            } else {
366                prev_r = Some(curr);
367            }
368        }
369        c.into()
370    }
371
372    /// Reorder columns and set visibility.
373    pub fn project_with_vis(&self, indices: &[usize], vis: Bitmap) -> Self {
374        Self {
375            ops: self.ops.clone(),
376            data: self.data.project_with_vis(indices, vis),
377        }
378    }
379
380    /// Clone the `StreamChunk` with a new visibility.
381    pub fn clone_with_vis(&self, vis: Bitmap) -> Self {
382        Self {
383            ops: self.ops.clone(),
384            data: self.data.with_visibility(vis),
385        }
386    }
387}
388
389impl Deref for StreamChunk {
390    type Target = DataChunk;
391
392    fn deref(&self) -> &Self::Target {
393        &self.data
394    }
395}
396
397impl DerefMut for StreamChunk {
398    fn deref_mut(&mut self) -> &mut Self::Target {
399        &mut self.data
400    }
401}
402
403/// `StreamChunk` can be created from `DataChunk` with all operations set to `Insert`.
404impl From<DataChunk> for StreamChunk {
405    fn from(data: DataChunk) -> Self {
406        Self::from_parts(vec![Op::Insert; data.capacity()], data)
407    }
408}
409
410impl fmt::Debug for StreamChunk {
411    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
412        if f.alternate() {
413            write!(
414                f,
415                "StreamChunk {{ cardinality: {}, capacity: {}, data:\n{}\n }}",
416                self.cardinality(),
417                self.capacity(),
418                self.to_pretty()
419            )
420        } else {
421            f.debug_struct("StreamChunk")
422                .field("cardinality", &self.cardinality())
423                .field("capacity", &self.capacity())
424                .finish_non_exhaustive()
425        }
426    }
427}
428
429impl EstimateSize for StreamChunk {
430    fn estimated_heap_size(&self) -> usize {
431        self.data.estimated_heap_size() + self.ops.len() * size_of::<Op>()
432    }
433}
434
435enum OpsMutState {
436    ArcRef(Arc<[Op]>),
437    Mut(Vec<Op>),
438}
439
440impl OpsMutState {
441    const UNDEFINED: Self = Self::Mut(Vec::new());
442}
443
444pub struct OpsMut {
445    state: OpsMutState,
446}
447
448impl OpsMut {
449    pub fn new(ops: Arc<[Op]>) -> Self {
450        Self {
451            state: OpsMutState::ArcRef(ops),
452        }
453    }
454
455    pub fn len(&self) -> usize {
456        match &self.state {
457            OpsMutState::ArcRef(v) => v.len(),
458            OpsMutState::Mut(v) => v.len(),
459        }
460    }
461
462    pub fn is_empty(&self) -> bool {
463        self.len() == 0
464    }
465
466    pub fn set(&mut self, n: usize, val: Op) {
467        debug_assert!(n < self.len());
468        if let OpsMutState::Mut(v) = &mut self.state {
469            v[n] = val;
470        } else {
471            let state = mem::replace(&mut self.state, OpsMutState::UNDEFINED); // intermediate state
472            let mut v = match state {
473                OpsMutState::ArcRef(v) => v.to_vec(),
474                OpsMutState::Mut(_) => unreachable!(),
475            };
476            v[n] = val;
477            self.state = OpsMutState::Mut(v);
478        }
479    }
480
481    pub fn get(&self, n: usize) -> Op {
482        debug_assert!(n < self.len());
483        match &self.state {
484            OpsMutState::ArcRef(v) => v[n],
485            OpsMutState::Mut(v) => v[n],
486        }
487    }
488}
489impl From<OpsMut> for Arc<[Op]> {
490    fn from(v: OpsMut) -> Self {
491        match v.state {
492            OpsMutState::ArcRef(a) => a,
493            OpsMutState::Mut(v) => v.into(),
494        }
495    }
496}
497
498/// A mutable wrapper for `StreamChunk`. can only set the visibilities and ops in place, can not
499/// change the length.
500pub struct StreamChunkMut {
501    columns: Arc<[ArrayRef]>,
502    ops: OpsMut,
503    vis: BitmapBuilder,
504}
505
506impl From<StreamChunk> for StreamChunkMut {
507    fn from(c: StreamChunk) -> Self {
508        let (c, ops) = c.into_parts();
509        let (columns, vis) = c.into_parts_v2();
510        Self {
511            columns,
512            ops: OpsMut::new(ops),
513            vis: vis.into(),
514        }
515    }
516}
517
518impl From<StreamChunkMut> for StreamChunk {
519    fn from(c: StreamChunkMut) -> Self {
520        StreamChunk::from_parts(c.ops, DataChunk::from_parts(c.columns, c.vis.finish()))
521    }
522}
523
524pub struct OpRowMutRef<'a> {
525    c: &'a mut StreamChunkMut,
526    i: usize,
527}
528
529// Act as a placeholder value when using in `ChangeBuffer`.
530impl Default for OpRowMutRef<'_> {
531    fn default() -> Self {
532        static mut DUMMY_CHUNK_MUT: LazyLock<StreamChunkMut> =
533            LazyLock::new(|| StreamChunk::default().into());
534
535        #[allow(clippy::deref_addrof)] // false positive
536        OpRowMutRef {
537            c: unsafe { &mut *(&raw mut DUMMY_CHUNK_MUT) },
538            i: 0,
539        }
540    }
541}
542
543impl PartialEq for OpRowMutRef<'_> {
544    fn eq(&self, other: &Self) -> bool {
545        self.row_ref() == other.row_ref()
546    }
547}
548impl Eq for OpRowMutRef<'_> {}
549
550impl<'a> OpRowMutRef<'a> {
551    pub fn index(&self) -> usize {
552        self.i
553    }
554
555    pub fn vis(&self) -> bool {
556        self.c.vis.is_set(self.i)
557    }
558
559    pub fn op(&self) -> Op {
560        self.c.ops.get(self.i)
561    }
562
563    pub fn set_vis(&mut self, val: bool) {
564        self.c.set_vis(self.i, val);
565    }
566
567    pub fn set_op(&mut self, val: Op) {
568        self.c.set_op(self.i, val);
569    }
570
571    pub fn row_ref(&self) -> RowRef<'_> {
572        RowRef::with_columns(self.c.columns(), self.i)
573    }
574
575    /// return if the two row ref is in the same chunk
576    pub fn same_chunk(&self, other: &Self) -> bool {
577        std::ptr::eq(self.c, other.c)
578    }
579}
580
581impl StreamChunkMut {
582    pub fn capacity(&self) -> usize {
583        self.vis.len()
584    }
585
586    pub fn vis(&self, i: usize) -> bool {
587        self.vis.is_set(i)
588    }
589
590    pub fn op(&self, i: usize) -> Op {
591        self.ops.get(i)
592    }
593
594    pub fn row_ref(&self, i: usize) -> RowRef<'_> {
595        RowRef::with_columns(self.columns(), i)
596    }
597
598    pub fn set_vis(&mut self, n: usize, val: bool) {
599        self.vis.set(n, val);
600    }
601
602    pub fn set_op(&mut self, n: usize, val: Op) {
603        self.ops.set(n, val);
604    }
605
606    pub fn columns(&self) -> &[ArrayRef] {
607        &self.columns
608    }
609
610    /// get the mut reference of the stream chunk.
611    pub fn to_rows_mut(&mut self) -> impl Iterator<Item = (RowRef<'_>, OpRowMutRef<'_>)> {
612        // SAFETY: `OpRowMutRef` can only mutate the visibility and ops, which is safe even
613        // when the columns are borrowed by `RowRef` at the same time.
614        unsafe {
615            (0..self.vis.len())
616                .filter(|i| self.vis.is_set(*i))
617                .map(|i| {
618                    let p = self as *const StreamChunkMut;
619                    let p = p as *mut StreamChunkMut;
620                    (
621                        RowRef::with_columns(self.columns(), i),
622                        OpRowMutRef { c: &mut *p, i },
623                    )
624                })
625        }
626    }
627}
628
629/// Test utilities for [`StreamChunk`].
630#[easy_ext::ext(StreamChunkTestExt)]
631impl StreamChunk {
632    /// Parse a chunk from string.
633    ///
634    /// See also [`DataChunkTestExt::from_pretty`].
635    ///
636    /// # Format
637    ///
638    /// The first line is a header indicating the column types.
639    /// The following lines indicate rows within the chunk.
640    /// Each line starts with an operation followed by values.
641    /// NULL values are represented as `.`.
642    ///
643    /// # Example
644    /// ```
645    /// use risingwave_common::array::StreamChunk;
646    /// use risingwave_common::array::stream_chunk::StreamChunkTestExt as _;
647    /// let chunk = StreamChunk::from_pretty(
648    ///     "  I I I I      // type chars
649    ///     U- 2 5 . .      // '.' means NULL
650    ///     U+ 2 5 2 6 D    // 'D' means deleted in visibility
651    ///     +  . . 4 8      // ^ comments are ignored
652    ///     -  . . 3 4",
653    /// );
654    /// //  ^ operations:
655    /// //     +: Insert
656    /// //     -: Delete
657    /// //    U+: UpdateInsert
658    /// //    U-: UpdateDelete
659    ///
660    /// // type chars:
661    /// //     I: i64
662    /// //     i: i32
663    /// //     F: f64
664    /// //     f: f32
665    /// //     T: str
666    /// //    TS: Timestamp
667    /// //    TZ: Timestamptz
668    /// //   SRL: Serial
669    /// //   x[]: array of x
670    /// // <i,f>: struct
671    /// ```
672    pub fn from_pretty(s: &str) -> Self {
673        let mut chunk_str = String::new();
674        let mut ops = vec![];
675
676        let (header, body) = match s.split_once('\n') {
677            Some(pair) => pair,
678            None => {
679                // empty chunk
680                return StreamChunk {
681                    ops: Arc::new([]),
682                    data: DataChunk::from_pretty(s),
683                };
684            }
685        };
686        chunk_str.push_str(header);
687        chunk_str.push('\n');
688
689        for line in body.split_inclusive('\n') {
690            if line.trim_start().is_empty() {
691                continue;
692            }
693            let (op, row) = line
694                .trim_start()
695                .split_once(|c: char| c.is_ascii_whitespace())
696                .ok_or_else(|| panic!("missing operation: {line:?}"))
697                .unwrap();
698            ops.push(match op {
699                "+" => Op::Insert,
700                "-" => Op::Delete,
701                "U+" => Op::UpdateInsert,
702                "U-" => Op::UpdateDelete,
703                t => panic!("invalid op: {t:?}"),
704            });
705            chunk_str.push_str(row);
706        }
707        StreamChunk {
708            ops: ops.into(),
709            data: DataChunk::from_pretty(&chunk_str),
710        }
711    }
712
713    /// Validate the `StreamChunk` layout.
714    pub fn valid(&self) -> bool {
715        let len = self.ops.len();
716        let data = &self.data;
717        data.visibility().len() == len && data.columns().iter().all(|col| col.len() == len)
718    }
719
720    /// Concatenate multiple `StreamChunk` into one.
721    ///
722    /// Panics if `chunks` is empty.
723    pub fn concat(chunks: Vec<StreamChunk>) -> StreamChunk {
724        let data_types = chunks[0].data_types();
725        let size = chunks.iter().map(|c| c.cardinality()).sum::<usize>();
726
727        let mut builder = StreamChunkBuilder::unlimited(data_types, Some(size));
728
729        for chunk in chunks {
730            // TODO: directly append chunks.
731            for (op, row) in chunk.rows() {
732                let none = builder.append_row(op, row);
733                debug_assert!(none.is_none());
734            }
735        }
736
737        builder.take().expect("chunk should not be empty")
738    }
739
740    /// Sort rows.
741    pub fn sort_rows(self) -> Self {
742        if self.capacity() == 0 {
743            return self;
744        }
745        let rows = self.rows().collect_vec();
746        let mut idx = (0..self.capacity()).collect_vec();
747        idx.sort_by_key(|&i| {
748            let (op, row_ref) = rows[i];
749            (op, DefaultOrdered(row_ref))
750        });
751        StreamChunk {
752            ops: idx.iter().map(|&i| self.ops[i]).collect(),
753            data: self.data.reorder_rows(&idx),
754        }
755    }
756
757    /// Generate `num_of_chunks` data chunks with type `data_types`,
758    /// where each data chunk has cardinality of `chunk_size`.
759    /// TODO(kwannoel): Generate different types of op, different vis.
760    pub fn gen_stream_chunks(
761        num_of_chunks: usize,
762        chunk_size: usize,
763        data_types: &[DataType],
764        varchar_properties: &VarcharProperty,
765    ) -> Vec<StreamChunk> {
766        Self::gen_stream_chunks_inner(
767            num_of_chunks,
768            chunk_size,
769            data_types,
770            varchar_properties,
771            1.0,
772            1.0,
773        )
774    }
775
776    pub fn gen_stream_chunks_inner(
777        num_of_chunks: usize,
778        chunk_size: usize,
779        data_types: &[DataType],
780        varchar_properties: &VarcharProperty,
781        visibility_percent: f64, // % of rows that are visible
782        inserts_percent: f64,    // Rest will be deletes.
783    ) -> Vec<StreamChunk> {
784        let ops = if inserts_percent == 0.0 {
785            vec![Op::Delete; chunk_size]
786        } else if inserts_percent == 1.0 {
787            vec![Op::Insert; chunk_size]
788        } else {
789            let mut rng = SmallRng::from_seed([0; 32]);
790            let mut ops = vec![];
791            for _ in 0..chunk_size {
792                ops.push(if rng.random_bool(inserts_percent) {
793                    Op::Insert
794                } else {
795                    Op::Delete
796                });
797            }
798            ops
799        };
800        DataChunk::gen_data_chunks(
801            num_of_chunks,
802            chunk_size,
803            data_types,
804            varchar_properties,
805            visibility_percent,
806        )
807        .into_iter()
808        .map(|chunk| StreamChunk::from_parts(ops.clone(), chunk))
809        .collect()
810    }
811}
812
813#[cfg(test)]
814mod tests {
815    use super::*;
816
817    #[test]
818    fn test_to_pretty_string() {
819        let chunk = StreamChunk::from_pretty(
820            "  I I
821             + 1 6
822             - 2 .
823            U- 3 7
824            U+ 4 .",
825        );
826        assert_eq!(
827            chunk.to_pretty().to_string(),
828            "\
829+----+---+---+
830|  + | 1 | 6 |
831|  - | 2 |   |
832| U- | 3 | 7 |
833| U+ | 4 |   |
834+----+---+---+"
835        );
836    }
837
838    #[test]
839    fn test_split_1() {
840        let chunk = StreamChunk::from_pretty(
841            "  I I
842             + 1 6
843             - 2 .
844            U- 3 7
845            U+ 4 .",
846        );
847        let results = chunk.split(2);
848        assert_eq!(2, results.len());
849        assert_eq!(
850            results[0].to_pretty().to_string(),
851            "\
852+---+---+---+
853| + | 1 | 6 |
854| - | 2 |   |
855+---+---+---+"
856        );
857        assert_eq!(
858            results[1].to_pretty().to_string(),
859            "\
860+----+---+---+
861| U- | 3 | 7 |
862| U+ | 4 |   |
863+----+---+---+"
864        );
865    }
866
867    #[test]
868    fn test_split_2() {
869        let chunk = StreamChunk::from_pretty(
870            "  I I
871             + 1 6
872            U- 3 7
873            U+ 4 .
874             - 2 .",
875        );
876        let results = chunk.split(2);
877        assert_eq!(2, results.len());
878        assert_eq!(
879            results[0].to_pretty().to_string(),
880            "\
881+----+---+---+
882|  + | 1 | 6 |
883| U- | 3 | 7 |
884| U+ | 4 |   |
885+----+---+---+"
886        );
887        assert_eq!(
888            results[1].to_pretty().to_string(),
889            "\
890+---+---+---+
891| - | 2 |   |
892+---+---+---+"
893        );
894    }
895
896    #[test]
897    fn test_eliminate_adjacent_noop_update() {
898        let c = StreamChunk::from_pretty(
899            "  I I
900            - 1 6 D
901            - 2 2
902            + 2 3
903            - 2 3
904            + 1 6
905            - 1 7
906            + 1 10 D
907            + 1 7
908            U- 3 7
909            U+ 3 7
910            + 2 3",
911        );
912        let c = c.eliminate_adjacent_noop_update();
913        assert_eq!(
914            c.to_pretty().to_string(),
915            "\
916+---+---+---+
917| - | 2 | 2 |
918| + | 1 | 6 |
919| + | 2 | 3 |
920+---+---+---+"
921        );
922    }
923}