risingwave_common/array/
stream_chunk.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Display;
16use std::mem::size_of;
17use std::ops::{Deref, DerefMut};
18use std::sync::{Arc, LazyLock};
19use std::{fmt, mem};
20
21use either::Either;
22use enum_as_inner::EnumAsInner;
23use itertools::Itertools;
24use rand::prelude::SmallRng;
25use rand::{Rng, SeedableRng};
26use risingwave_common_estimate_size::EstimateSize;
27use risingwave_pb::data::{PbOp, PbStreamChunk};
28
29use super::stream_chunk_builder::StreamChunkBuilder;
30use super::{ArrayImpl, ArrayRef, ArrayResult, DataChunkTestExt, RowRef};
31use crate::array::DataChunk;
32use crate::bitmap::{Bitmap, BitmapBuilder};
33use crate::catalog::Schema;
34use crate::field_generator::VarcharProperty;
35use crate::row::Row;
36use crate::types::{DataType, DefaultOrdered, ToText};
37
38/// `Op` represents three operations in `StreamChunk`.
39///
40/// `UpdateDelete` and `UpdateInsert` are semantically equivalent to `Delete` and `Insert`
41/// but always appear in pairs to represent an update operation. It's guaranteed that
42/// they are adjacent to each other in the same `StreamChunk`, and the stream key of the two
43/// rows are the same.
44#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash, EnumAsInner)]
45pub enum Op {
46    Insert,
47    Delete,
48    UpdateDelete,
49    UpdateInsert,
50}
51
52impl Op {
53    pub fn to_protobuf(self) -> PbOp {
54        match self {
55            Op::Insert => PbOp::Insert,
56            Op::Delete => PbOp::Delete,
57            Op::UpdateInsert => PbOp::UpdateInsert,
58            Op::UpdateDelete => PbOp::UpdateDelete,
59        }
60    }
61
62    pub fn from_protobuf(prost: &i32) -> ArrayResult<Op> {
63        let op = match PbOp::try_from(*prost) {
64            Ok(PbOp::Insert) => Op::Insert,
65            Ok(PbOp::Delete) => Op::Delete,
66            Ok(PbOp::UpdateInsert) => Op::UpdateInsert,
67            Ok(PbOp::UpdateDelete) => Op::UpdateDelete,
68            Ok(PbOp::Unspecified) => unreachable!(),
69            Err(_) => bail!("No such op type"),
70        };
71        Ok(op)
72    }
73
74    /// convert `UpdateDelete` to `Delete` and `UpdateInsert` to Insert
75    pub fn normalize_update(self) -> Op {
76        match self {
77            Op::Insert => Op::Insert,
78            Op::Delete => Op::Delete,
79            Op::UpdateDelete => Op::Delete,
80            Op::UpdateInsert => Op::Insert,
81        }
82    }
83
84    pub fn to_i16(self) -> i16 {
85        match self {
86            Op::Insert => 1,
87            Op::Delete => 2,
88            Op::UpdateInsert => 3,
89            Op::UpdateDelete => 4,
90        }
91    }
92
93    pub fn to_varchar(self) -> String {
94        match self {
95            Op::Insert => "Insert",
96            Op::Delete => "Delete",
97            Op::UpdateInsert => "UpdateInsert",
98            Op::UpdateDelete => "UpdateDelete",
99        }
100        .to_owned()
101    }
102}
103
104/// `StreamChunk` is used to pass data over the streaming pathway.
105#[derive(Clone, PartialEq)]
106pub struct StreamChunk {
107    // TODO: Optimize using bitmap
108    ops: Arc<[Op]>,
109    data: DataChunk,
110}
111
112impl Default for StreamChunk {
113    /// Create a 0-row-0-col `StreamChunk`. Only used in some existing tests.
114    /// This is NOT the same as an **empty** chunk, which has 0 rows but with
115    /// columns aligned with executor schema.
116    fn default() -> Self {
117        Self {
118            ops: Arc::new([]),
119            data: DataChunk::new(vec![], 0),
120        }
121    }
122}
123
124impl StreamChunk {
125    /// Create a new `StreamChunk` with given ops and columns.
126    pub fn new(ops: impl Into<Arc<[Op]>>, columns: Vec<ArrayRef>) -> Self {
127        let ops = ops.into();
128        let visibility = Bitmap::ones(ops.len());
129        Self::with_visibility(ops, columns, visibility)
130    }
131
132    /// Create a new `StreamChunk` with given ops, columns and visibility.
133    pub fn with_visibility(
134        ops: impl Into<Arc<[Op]>>,
135        columns: Vec<ArrayRef>,
136        visibility: Bitmap,
137    ) -> Self {
138        let ops = ops.into();
139        for col in &columns {
140            assert_eq!(col.len(), ops.len());
141        }
142        let data = DataChunk::new(columns, visibility);
143        StreamChunk { ops, data }
144    }
145
146    /// Build a `StreamChunk` from rows.
147    ///
148    /// Panics if the `rows` is empty.
149    ///
150    /// Should prefer using [`StreamChunkBuilder`] instead to avoid unnecessary
151    /// allocation of rows.
152    pub fn from_rows(rows: &[(Op, impl Row)], data_types: &[DataType]) -> Self {
153        let mut builder = StreamChunkBuilder::unlimited(data_types.to_vec(), Some(rows.len()));
154
155        for (op, row) in rows {
156            let none = builder.append_row(*op, row);
157            debug_assert!(none.is_none());
158        }
159
160        builder.take().expect("chunk should not be empty")
161    }
162
163    pub fn empty(data_types: &[DataType]) -> Self {
164        StreamChunkBuilder::build_empty(data_types.to_vec())
165    }
166
167    /// Get the reference of the underlying data chunk.
168    pub fn data_chunk(&self) -> &DataChunk {
169        &self.data
170    }
171
172    /// Removes the invisible rows based on `visibility`. Returns a new compacted chunk
173    /// with all rows visible.
174    ///
175    /// This does not change the visible content of the chunk. Not to be confused with
176    /// `StreamChunkCompactor`, which removes unnecessary changes based on the key.
177    ///
178    /// See [`DataChunk::compact_vis`] for more details.
179    pub fn compact_vis(self) -> Self {
180        if self.is_vis_compacted() {
181            return self;
182        }
183
184        let (ops, columns, visibility) = self.into_inner();
185
186        let cardinality = visibility
187            .iter()
188            .fold(0, |vis_cnt, vis| vis_cnt + vis as usize);
189        let columns: Vec<_> = columns
190            .into_iter()
191            .map(|col| col.compact_vis(&visibility, cardinality).into())
192            .collect();
193        let mut new_ops = Vec::with_capacity(cardinality);
194        for idx in visibility.iter_ones() {
195            new_ops.push(ops[idx]);
196        }
197        StreamChunk::new(new_ops, columns)
198    }
199
200    /// Split the `StreamChunk` into multiple chunks with the given size at most.
201    ///
202    /// When the total cardinality of all the chunks is not evenly divided by the `size`,
203    /// the last new chunk will be the remainder.
204    ///
205    /// For consecutive `UpdateDelete` and `UpdateInsert`, they will be kept in one chunk.
206    /// As a result, some chunks may have `size + 1` rows.
207    pub fn split(&self, size: usize) -> Vec<Self> {
208        let mut builder = StreamChunkBuilder::new(size, self.data_types());
209        let mut outputs = Vec::new();
210
211        // TODO: directly append the chunk.
212        for (op, row) in self.rows() {
213            if let Some(chunk) = builder.append_row(op, row) {
214                outputs.push(chunk);
215            }
216        }
217        if let Some(output) = builder.take() {
218            outputs.push(output);
219        }
220
221        outputs
222    }
223
224    pub fn into_parts(self) -> (DataChunk, Arc<[Op]>) {
225        (self.data, self.ops)
226    }
227
228    pub fn from_parts(ops: impl Into<Arc<[Op]>>, data_chunk: DataChunk) -> Self {
229        let (columns, vis) = data_chunk.into_parts();
230        Self::with_visibility(ops, columns, vis)
231    }
232
233    pub fn into_inner(self) -> (Arc<[Op]>, Vec<ArrayRef>, Bitmap) {
234        let (columns, vis) = self.data.into_parts();
235        (self.ops, columns, vis)
236    }
237
238    pub fn to_protobuf(&self) -> PbStreamChunk {
239        if !self.is_vis_compacted() {
240            return self.clone().compact_vis().to_protobuf();
241        }
242        PbStreamChunk {
243            cardinality: self.cardinality() as u32,
244            ops: self.ops.iter().map(|op| op.to_protobuf() as i32).collect(),
245            columns: self.columns().iter().map(|col| col.to_protobuf()).collect(),
246        }
247    }
248
249    pub fn from_protobuf(prost: &PbStreamChunk) -> ArrayResult<Self> {
250        let cardinality = prost.get_cardinality() as usize;
251        let mut ops = Vec::with_capacity(cardinality);
252        for op in prost.get_ops() {
253            ops.push(Op::from_protobuf(op)?);
254        }
255        let mut columns = vec![];
256        for column in prost.get_columns() {
257            columns.push(ArrayImpl::from_protobuf(column, cardinality)?.into());
258        }
259        Ok(StreamChunk::new(ops, columns))
260    }
261
262    pub fn ops(&self) -> &[Op] {
263        &self.ops
264    }
265
266    /// Returns a table-like text representation of the `StreamChunk`.
267    pub fn to_pretty(&self) -> impl Display + use<> {
268        self.to_pretty_inner(None)
269    }
270
271    /// Returns a table-like text representation of the `StreamChunk` with a header of column names
272    /// from the given `schema`.
273    pub fn to_pretty_with_schema(&self, schema: &Schema) -> impl Display + use<> {
274        self.to_pretty_inner(Some(schema))
275    }
276
277    fn to_pretty_inner(&self, schema: Option<&Schema>) -> impl Display + use<> {
278        use comfy_table::{Cell, CellAlignment, Table};
279
280        if self.cardinality() == 0 {
281            return Either::Left("(empty)");
282        }
283
284        let mut table = Table::new();
285        table.load_preset(DataChunk::PRETTY_TABLE_PRESET);
286
287        if let Some(schema) = schema {
288            assert_eq!(self.dimension(), schema.len());
289            let cells = std::iter::once(String::new())
290                .chain(schema.fields().iter().map(|f| f.name.clone()));
291            table.set_header(cells);
292        }
293
294        for (op, row_ref) in self.rows() {
295            let mut cells = Vec::with_capacity(row_ref.len() + 1);
296            cells.push(
297                Cell::new(match op {
298                    Op::Insert => "+",
299                    Op::Delete => "-",
300                    Op::UpdateDelete => "U-",
301                    Op::UpdateInsert => "U+",
302                })
303                .set_alignment(CellAlignment::Right),
304            );
305            for datum in row_ref.iter() {
306                let str = match datum {
307                    None => "".to_owned(), // NULL
308                    Some(scalar) => scalar.to_text(),
309                };
310                cells.push(Cell::new(str));
311            }
312            table.add_row(cells);
313        }
314
315        Either::Right(table)
316    }
317
318    /// Reorder (and possibly remove) columns.
319    ///
320    /// e.g. if `indices` is `[2, 1, 0]`, and the chunk contains column `[a, b, c]`, then the output
321    /// will be `[c, b, a]`. If `indices` is [2, 0], then the output will be `[c, a]`.
322    /// If the input mapping is identity mapping, no reorder will be performed.
323    pub fn project(&self, indices: &[usize]) -> Self {
324        Self {
325            ops: self.ops.clone(),
326            data: self.data.project(indices),
327        }
328    }
329
330    /// Remove the adjacent delete-insert and insert-deletes if their row value are the same.
331    pub fn eliminate_adjacent_noop_update(self) -> Self {
332        let len = self.data_chunk().capacity();
333        let mut c: StreamChunkMut = self.into();
334        let mut prev_r = None;
335        for curr in 0..len {
336            if !c.vis(curr) {
337                continue;
338            }
339            if let Some(prev) = prev_r
340                && (
341                    // 1. Delete then Insert
342                    (matches!(c.op(prev), Op::UpdateDelete | Op::Delete)
343                    && matches!(c.op(curr), Op::UpdateInsert | Op::Insert))
344                    ||
345                    // 2. Insert then Delete
346                    //
347                    // Note that after eliminating `U+` and `U-` here, we will get a new
348                    // pair of `U-` and `U+` consisting of `prev.prev` and the `next` row.
349                    // `prev.prev` and `prev`, `curr` and `next` share the same stream key
350                    // because they are `U-` and `U+` pairs, while `prev` and `curr` share
351                    // the same stream key because they are equal, so `prev-prev` and `next`
352                    // must also share the same stream key. Therefore, it's okay to leave
353                    // their `Update` ops unchanged.
354                    //
355                    // Also, they can't be the same, otherwise these 4 rows are the same,
356                    // which should already be eliminated by the first branch.
357                    (matches!(c.op(prev), Op::UpdateInsert | Op::Insert)
358                    && matches!(c.op(curr), Op::UpdateDelete | Op::Delete))
359                )
360                && c.row_ref(prev) == c.row_ref(curr)
361            {
362                c.set_vis(prev, false);
363                c.set_vis(curr, false);
364                prev_r = None;
365            } else {
366                prev_r = Some(curr);
367            }
368        }
369
370        // Normalize update pairs that became partially invisible.
371        // If only U- is visible, turn it into Delete; if only U+ is visible, turn it into Insert.
372        for idx in 0..len.saturating_sub(1) {
373            if c.op(idx) == Op::UpdateDelete && c.op(idx + 1) == Op::UpdateInsert {
374                let delete_vis = c.vis(idx);
375                let insert_vis = c.vis(idx + 1);
376                if delete_vis && !insert_vis {
377                    c.set_op(idx, Op::Delete);
378                } else if !delete_vis && insert_vis {
379                    c.set_op(idx + 1, Op::Insert);
380                }
381            }
382        }
383        c.into()
384    }
385
386    /// Reorder columns and set visibility.
387    pub fn project_with_vis(&self, indices: &[usize], vis: Bitmap) -> Self {
388        Self {
389            ops: self.ops.clone(),
390            data: self.data.project_with_vis(indices, vis),
391        }
392    }
393
394    /// Clone the `StreamChunk` with a new visibility.
395    pub fn clone_with_vis(&self, vis: Bitmap) -> Self {
396        Self {
397            ops: self.ops.clone(),
398            data: self.data.with_visibility(vis),
399        }
400    }
401}
402
403impl Deref for StreamChunk {
404    type Target = DataChunk;
405
406    fn deref(&self) -> &Self::Target {
407        &self.data
408    }
409}
410
411impl DerefMut for StreamChunk {
412    fn deref_mut(&mut self) -> &mut Self::Target {
413        &mut self.data
414    }
415}
416
417/// `StreamChunk` can be created from `DataChunk` with all operations set to `Insert`.
418impl From<DataChunk> for StreamChunk {
419    fn from(data: DataChunk) -> Self {
420        Self::from_parts(vec![Op::Insert; data.capacity()], data)
421    }
422}
423
424impl fmt::Debug for StreamChunk {
425    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
426        if f.alternate() {
427            write!(
428                f,
429                "StreamChunk {{ cardinality: {}, capacity: {}, data:\n{}\n }}",
430                self.cardinality(),
431                self.capacity(),
432                self.to_pretty()
433            )
434        } else {
435            f.debug_struct("StreamChunk")
436                .field("cardinality", &self.cardinality())
437                .field("capacity", &self.capacity())
438                .finish_non_exhaustive()
439        }
440    }
441}
442
443impl EstimateSize for StreamChunk {
444    fn estimated_heap_size(&self) -> usize {
445        self.data.estimated_heap_size() + self.ops.len() * size_of::<Op>()
446    }
447}
448
449enum OpsMutState {
450    ArcRef(Arc<[Op]>),
451    Mut(Vec<Op>),
452}
453
454impl OpsMutState {
455    const UNDEFINED: Self = Self::Mut(Vec::new());
456}
457
458pub struct OpsMut {
459    state: OpsMutState,
460}
461
462impl OpsMut {
463    pub fn new(ops: Arc<[Op]>) -> Self {
464        Self {
465            state: OpsMutState::ArcRef(ops),
466        }
467    }
468
469    pub fn len(&self) -> usize {
470        match &self.state {
471            OpsMutState::ArcRef(v) => v.len(),
472            OpsMutState::Mut(v) => v.len(),
473        }
474    }
475
476    pub fn is_empty(&self) -> bool {
477        self.len() == 0
478    }
479
480    pub fn set(&mut self, n: usize, val: Op) {
481        debug_assert!(n < self.len());
482        if let OpsMutState::Mut(v) = &mut self.state {
483            v[n] = val;
484        } else {
485            let state = mem::replace(&mut self.state, OpsMutState::UNDEFINED); // intermediate state
486            let mut v = match state {
487                OpsMutState::ArcRef(v) => v.to_vec(),
488                OpsMutState::Mut(_) => unreachable!(),
489            };
490            v[n] = val;
491            self.state = OpsMutState::Mut(v);
492        }
493    }
494
495    pub fn get(&self, n: usize) -> Op {
496        debug_assert!(n < self.len());
497        match &self.state {
498            OpsMutState::ArcRef(v) => v[n],
499            OpsMutState::Mut(v) => v[n],
500        }
501    }
502}
503impl From<OpsMut> for Arc<[Op]> {
504    fn from(v: OpsMut) -> Self {
505        match v.state {
506            OpsMutState::ArcRef(a) => a,
507            OpsMutState::Mut(v) => v.into(),
508        }
509    }
510}
511
512/// A mutable wrapper for `StreamChunk`. can only set the visibilities and ops in place, can not
513/// change the length.
514pub struct StreamChunkMut {
515    columns: Arc<[ArrayRef]>,
516    ops: OpsMut,
517    vis: BitmapBuilder,
518}
519
520impl From<StreamChunk> for StreamChunkMut {
521    fn from(c: StreamChunk) -> Self {
522        let (c, ops) = c.into_parts();
523        let (columns, vis) = c.into_parts_v2();
524        Self {
525            columns,
526            ops: OpsMut::new(ops),
527            vis: vis.into(),
528        }
529    }
530}
531
532impl From<StreamChunkMut> for StreamChunk {
533    fn from(c: StreamChunkMut) -> Self {
534        StreamChunk::from_parts(c.ops, DataChunk::from_parts(c.columns, c.vis.finish()))
535    }
536}
537
538pub struct OpRowMutRef<'a> {
539    c: &'a mut StreamChunkMut,
540    i: usize,
541}
542
543// Act as a placeholder value when using in `ChangeBuffer`.
544impl Default for OpRowMutRef<'_> {
545    fn default() -> Self {
546        static mut DUMMY_CHUNK_MUT: LazyLock<StreamChunkMut> =
547            LazyLock::new(|| StreamChunk::default().into());
548
549        #[allow(clippy::deref_addrof)] // false positive
550        OpRowMutRef {
551            c: unsafe { &mut *(&raw mut DUMMY_CHUNK_MUT) },
552            i: 0,
553        }
554    }
555}
556
557impl PartialEq for OpRowMutRef<'_> {
558    fn eq(&self, other: &Self) -> bool {
559        self.row_ref() == other.row_ref()
560    }
561}
562impl Eq for OpRowMutRef<'_> {}
563
564impl<'a> OpRowMutRef<'a> {
565    pub fn index(&self) -> usize {
566        self.i
567    }
568
569    pub fn vis(&self) -> bool {
570        self.c.vis.is_set(self.i)
571    }
572
573    pub fn op(&self) -> Op {
574        self.c.ops.get(self.i)
575    }
576
577    pub fn set_vis(&mut self, val: bool) {
578        self.c.set_vis(self.i, val);
579    }
580
581    pub fn set_op(&mut self, val: Op) {
582        self.c.set_op(self.i, val);
583    }
584
585    pub fn row_ref(&self) -> RowRef<'_> {
586        RowRef::with_columns(self.c.columns(), self.i)
587    }
588
589    /// return if the two row ref is in the same chunk
590    pub fn same_chunk(&self, other: &Self) -> bool {
591        std::ptr::eq(self.c, other.c)
592    }
593}
594
595impl StreamChunkMut {
596    pub fn capacity(&self) -> usize {
597        self.vis.len()
598    }
599
600    pub fn vis(&self, i: usize) -> bool {
601        self.vis.is_set(i)
602    }
603
604    pub fn op(&self, i: usize) -> Op {
605        self.ops.get(i)
606    }
607
608    pub fn row_ref(&self, i: usize) -> RowRef<'_> {
609        RowRef::with_columns(self.columns(), i)
610    }
611
612    pub fn set_vis(&mut self, n: usize, val: bool) {
613        self.vis.set(n, val);
614    }
615
616    pub fn set_op(&mut self, n: usize, val: Op) {
617        self.ops.set(n, val);
618    }
619
620    pub fn columns(&self) -> &[ArrayRef] {
621        &self.columns
622    }
623
624    /// get the mut reference of the stream chunk.
625    pub fn to_rows_mut(&mut self) -> impl Iterator<Item = (RowRef<'_>, OpRowMutRef<'_>)> {
626        // SAFETY: `OpRowMutRef` can only mutate the visibility and ops, which is safe even
627        // when the columns are borrowed by `RowRef` at the same time.
628        unsafe {
629            (0..self.vis.len())
630                .filter(|i| self.vis.is_set(*i))
631                .map(|i| {
632                    let p = self as *const StreamChunkMut;
633                    let p = p as *mut StreamChunkMut;
634                    (
635                        RowRef::with_columns(self.columns(), i),
636                        OpRowMutRef { c: &mut *p, i },
637                    )
638                })
639        }
640    }
641}
642
643/// Test utilities for [`StreamChunk`].
644#[easy_ext::ext(StreamChunkTestExt)]
645impl StreamChunk {
646    /// Parse a chunk from string.
647    ///
648    /// See also [`DataChunkTestExt::from_pretty`].
649    ///
650    /// # Format
651    ///
652    /// The first line is a header indicating the column types.
653    /// The following lines indicate rows within the chunk.
654    /// Each line starts with an operation followed by values.
655    /// NULL values are represented as `.`.
656    ///
657    /// # Example
658    /// ```
659    /// use risingwave_common::array::StreamChunk;
660    /// use risingwave_common::array::stream_chunk::StreamChunkTestExt as _;
661    /// let chunk = StreamChunk::from_pretty(
662    ///     "  I I I I      // type chars
663    ///     U- 2 5 . .      // '.' means NULL
664    ///     U+ 2 5 2 6 D    // 'D' means deleted in visibility
665    ///     +  . . 4 8      // ^ comments are ignored
666    ///     -  . . 3 4",
667    /// );
668    /// //  ^ operations:
669    /// //     +: Insert
670    /// //     -: Delete
671    /// //    U+: UpdateInsert
672    /// //    U-: UpdateDelete
673    ///
674    /// // type chars:
675    /// //     I: i64
676    /// //     i: i32
677    /// //     F: f64
678    /// //     f: f32
679    /// //     T: str
680    /// //    TS: Timestamp
681    /// //    TZ: Timestamptz
682    /// //   SRL: Serial
683    /// //   x[]: array of x
684    /// // <i,f>: struct
685    /// ```
686    pub fn from_pretty(s: &str) -> Self {
687        let mut chunk_str = String::new();
688        let mut ops = vec![];
689
690        let (header, body) = match s.split_once('\n') {
691            Some(pair) => pair,
692            None => {
693                // empty chunk
694                return StreamChunk {
695                    ops: Arc::new([]),
696                    data: DataChunk::from_pretty(s),
697                };
698            }
699        };
700        chunk_str.push_str(header);
701        chunk_str.push('\n');
702
703        for line in body.split_inclusive('\n') {
704            if line.trim_start().is_empty() {
705                continue;
706            }
707            let (op, row) = line
708                .trim_start()
709                .split_once(|c: char| c.is_ascii_whitespace())
710                .ok_or_else(|| panic!("missing operation: {line:?}"))
711                .unwrap();
712            ops.push(match op {
713                "+" => Op::Insert,
714                "-" => Op::Delete,
715                "U+" => Op::UpdateInsert,
716                "U-" => Op::UpdateDelete,
717                t => panic!("invalid op: {t:?}"),
718            });
719            chunk_str.push_str(row);
720        }
721        StreamChunk {
722            ops: ops.into(),
723            data: DataChunk::from_pretty(&chunk_str),
724        }
725    }
726
727    /// Validate the `StreamChunk` layout.
728    pub fn valid(&self) -> bool {
729        let len = self.ops.len();
730        let data = &self.data;
731        data.visibility().len() == len && data.columns().iter().all(|col| col.len() == len)
732    }
733
734    /// Concatenate multiple `StreamChunk` into one.
735    ///
736    /// Panics if `chunks` is empty.
737    pub fn concat(chunks: Vec<StreamChunk>) -> StreamChunk {
738        let data_types = chunks[0].data_types();
739        let size = chunks.iter().map(|c| c.cardinality()).sum::<usize>();
740
741        let mut builder = StreamChunkBuilder::unlimited(data_types, Some(size));
742
743        for chunk in chunks {
744            // TODO: directly append chunks.
745            for (op, row) in chunk.rows() {
746                let none = builder.append_row(op, row);
747                debug_assert!(none.is_none());
748            }
749        }
750
751        builder.take().expect("chunk should not be empty")
752    }
753
754    /// Sort rows.
755    pub fn sort_rows(self) -> Self {
756        if self.capacity() == 0 {
757            return self;
758        }
759        let rows = self.rows().collect_vec();
760        let mut idx = (0..self.capacity()).collect_vec();
761        idx.sort_by_key(|&i| {
762            let (op, row_ref) = rows[i];
763            (op, DefaultOrdered(row_ref))
764        });
765        StreamChunk {
766            ops: idx.iter().map(|&i| self.ops[i]).collect(),
767            data: self.data.reorder_rows(&idx),
768        }
769    }
770
771    /// Generate `num_of_chunks` data chunks with type `data_types`,
772    /// where each data chunk has cardinality of `chunk_size`.
773    /// TODO(kwannoel): Generate different types of op, different vis.
774    pub fn gen_stream_chunks(
775        num_of_chunks: usize,
776        chunk_size: usize,
777        data_types: &[DataType],
778        varchar_properties: &VarcharProperty,
779    ) -> Vec<StreamChunk> {
780        Self::gen_stream_chunks_inner(
781            num_of_chunks,
782            chunk_size,
783            data_types,
784            varchar_properties,
785            1.0,
786            1.0,
787        )
788    }
789
790    pub fn gen_stream_chunks_inner(
791        num_of_chunks: usize,
792        chunk_size: usize,
793        data_types: &[DataType],
794        varchar_properties: &VarcharProperty,
795        visibility_percent: f64, // % of rows that are visible
796        inserts_percent: f64,    // Rest will be deletes.
797    ) -> Vec<StreamChunk> {
798        let ops = if inserts_percent == 0.0 {
799            vec![Op::Delete; chunk_size]
800        } else if inserts_percent == 1.0 {
801            vec![Op::Insert; chunk_size]
802        } else {
803            let mut rng = SmallRng::from_seed([0; 32]);
804            let mut ops = vec![];
805            for _ in 0..chunk_size {
806                ops.push(if rng.random_bool(inserts_percent) {
807                    Op::Insert
808                } else {
809                    Op::Delete
810                });
811            }
812            ops
813        };
814        DataChunk::gen_data_chunks(
815            num_of_chunks,
816            chunk_size,
817            data_types,
818            varchar_properties,
819            visibility_percent,
820        )
821        .into_iter()
822        .map(|chunk| StreamChunk::from_parts(ops.clone(), chunk))
823        .collect()
824    }
825}
826
827#[cfg(test)]
828mod tests {
829    use super::*;
830
831    #[test]
832    fn test_to_pretty_string() {
833        let chunk = StreamChunk::from_pretty(
834            "  I I
835             + 1 6
836             - 2 .
837            U- 3 7
838            U+ 4 .",
839        );
840        assert_eq!(
841            chunk.to_pretty().to_string(),
842            "\
843+----+---+---+
844|  + | 1 | 6 |
845|  - | 2 |   |
846| U- | 3 | 7 |
847| U+ | 4 |   |
848+----+---+---+"
849        );
850    }
851
852    #[test]
853    fn test_split_1() {
854        let chunk = StreamChunk::from_pretty(
855            "  I I
856             + 1 6
857             - 2 .
858            U- 3 7
859            U+ 4 .",
860        );
861        let results = chunk.split(2);
862        assert_eq!(2, results.len());
863        assert_eq!(
864            results[0].to_pretty().to_string(),
865            "\
866+---+---+---+
867| + | 1 | 6 |
868| - | 2 |   |
869+---+---+---+"
870        );
871        assert_eq!(
872            results[1].to_pretty().to_string(),
873            "\
874+----+---+---+
875| U- | 3 | 7 |
876| U+ | 4 |   |
877+----+---+---+"
878        );
879    }
880
881    #[test]
882    fn test_split_2() {
883        let chunk = StreamChunk::from_pretty(
884            "  I I
885             + 1 6
886            U- 3 7
887            U+ 4 .
888             - 2 .",
889        );
890        let results = chunk.split(2);
891        assert_eq!(2, results.len());
892        assert_eq!(
893            results[0].to_pretty().to_string(),
894            "\
895+----+---+---+
896|  + | 1 | 6 |
897| U- | 3 | 7 |
898| U+ | 4 |   |
899+----+---+---+"
900        );
901        assert_eq!(
902            results[1].to_pretty().to_string(),
903            "\
904+---+---+---+
905| - | 2 |   |
906+---+---+---+"
907        );
908    }
909
910    #[test]
911    fn test_eliminate_adjacent_noop_update() {
912        let c = StreamChunk::from_pretty(
913            "  I I
914            - 1 6 D
915            - 2 2
916            + 2 3
917            - 2 3
918            + 1 6
919            - 1 7
920            + 1 10 D
921            + 1 7
922            U- 3 7
923            U+ 3 7
924            + 2 3",
925        );
926        let c = c.eliminate_adjacent_noop_update();
927        assert_eq!(
928            c.to_pretty().to_string(),
929            "\
930+---+---+---+
931| - | 2 | 2 |
932| + | 1 | 6 |
933| + | 2 | 3 |
934+---+---+---+"
935        );
936    }
937
938    #[test]
939    fn test_eliminate_adjacent_noop_update_normalize_update_pair() {
940        let c = StreamChunk::from_pretty(
941            "  I I
942            + 1 10
943            U- 1 10
944            U+ 1 20",
945        );
946        let c = c.eliminate_adjacent_noop_update();
947        assert_eq!(
948            c.to_pretty().to_string(),
949            "\
950+---+---+----+
951| + | 1 | 20 |
952+---+---+----+"
953        );
954    }
955}