risingwave_common/array/
data_chunk.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt;
16use std::fmt::Display;
17use std::hash::BuildHasher;
18use std::sync::Arc;
19
20use bytes::Bytes;
21use either::Either;
22use itertools::Itertools;
23use rand::rngs::SmallRng;
24use rand::{Rng, SeedableRng};
25use risingwave_common_estimate_size::EstimateSize;
26use risingwave_pb::data::PbDataChunk;
27
28use super::{Array, ArrayImpl, ArrayRef, ArrayResult, StructArray};
29use crate::array::ArrayBuilderImpl;
30use crate::array::data_chunk_iter::RowRef;
31use crate::bitmap::{Bitmap, BitmapBuilder};
32use crate::field_generator::{FieldGeneratorImpl, VarcharProperty};
33use crate::hash::HashCode;
34use crate::row::Row;
35use crate::types::{DataType, DatumRef, MapType, StructType, ToOwnedDatum, ToText};
36use crate::util::chunk_coalesce::DataChunkBuilder;
37use crate::util::hash_util::finalize_hashers;
38use crate::util::iter_util::ZipEqFast;
39use crate::util::value_encoding::{
40    ValueRowSerializer, estimate_serialize_datum_size, serialize_datum_into,
41    try_get_exact_serialize_datum_size,
42};
43
44/// [`DataChunk`] is a collection of Columns,
45/// with a visibility mask for each row.
46/// For instance, we could have a [`DataChunk`] of this format.
47///
48/// | v1 | v2 | v3 |
49/// |----|----|----|
50/// | 1  | a  | t  |
51/// | 2  | b  | f  |
52/// | 3  | c  | t  |
53/// | 4  | d  | f  |
54///
55/// Our columns are v1, v2, v3.
56/// Then, if the Visibility Mask hides rows 2 and 4,
57/// We will only have these rows visible:
58///
59/// | v1 | v2 | v3 |
60/// |----|----|----|
61/// | 1  | a  | t  |
62/// | 3  | c  | t  |
63#[derive(Clone, PartialEq)]
64#[must_use]
65pub struct DataChunk {
66    columns: Arc<[ArrayRef]>,
67    visibility: Bitmap,
68}
69
70impl DataChunk {
71    pub(crate) const PRETTY_TABLE_PRESET: &'static str = "||--+-++|    ++++++";
72
73    /// Create a `DataChunk` with `columns` and visibility.
74    ///
75    /// The visibility can either be a `Bitmap` or a simple cardinality number.
76    pub fn new(columns: Vec<ArrayRef>, visibility: impl Into<Bitmap>) -> Self {
77        let visibility = visibility.into();
78        let capacity = visibility.len();
79        for column in &columns {
80            assert_eq!(capacity, column.len());
81        }
82
83        DataChunk {
84            columns: columns.into(),
85            visibility,
86        }
87    }
88
89    /// `new_dummy` creates a data chunk without columns but only a cardinality.
90    pub fn new_dummy(cardinality: usize) -> Self {
91        DataChunk {
92            columns: Arc::new([]),
93            visibility: Bitmap::ones(cardinality),
94        }
95    }
96
97    /// Build a `DataChunk` with rows.
98    ///
99    /// Panics if the `rows` is empty.
100    ///
101    /// Should prefer using [`DataChunkBuilder`] instead to avoid unnecessary allocation
102    /// of rows.
103    pub fn from_rows(rows: &[impl Row], data_types: &[DataType]) -> Self {
104        // `append_one_row` will cause the builder to finish immediately once capacity is met.
105        // Hence, we allocate an extra row here, to avoid the builder finishing prematurely.
106        // This just makes the code cleaner, since we can loop through all rows, and consume it finally.
107        // TODO: introduce `new_unlimited` to decouple memory reservation from builder capacity.
108        let mut builder = DataChunkBuilder::new(data_types.to_vec(), rows.len() + 1);
109
110        for row in rows {
111            let none = builder.append_one_row(row);
112            debug_assert!(none.is_none());
113        }
114
115        builder.consume_all().expect("chunk should not be empty")
116    }
117
118    /// Return the next visible row index on or after `row_idx`.
119    pub fn next_visible_row_idx(&self, row_idx: usize) -> Option<usize> {
120        self.visibility.next_set_bit(row_idx)
121    }
122
123    pub fn into_parts(self) -> (Vec<ArrayRef>, Bitmap) {
124        (self.columns.to_vec(), self.visibility)
125    }
126
127    pub fn into_parts_v2(self) -> (Arc<[ArrayRef]>, Bitmap) {
128        (self.columns, self.visibility)
129    }
130
131    pub fn from_parts(columns: Arc<[ArrayRef]>, visibilities: Bitmap) -> Self {
132        Self {
133            columns,
134            visibility: visibilities,
135        }
136    }
137
138    pub fn dimension(&self) -> usize {
139        self.columns.len()
140    }
141
142    // TODO(rc): shall we rename this to `visible_size`? I sometimes find this confused with `capacity`.
143    /// `cardinality` returns the number of visible tuples
144    pub fn cardinality(&self) -> usize {
145        self.visibility.count_ones()
146    }
147
148    // Compute the required permits of this chunk for rate limiting.
149    pub fn rate_limit_permits(&self) -> u64 {
150        self.cardinality() as _
151    }
152
153    // TODO(rc): shall we rename this to `size`?
154    /// `capacity` returns physical length of any chunk column
155    pub fn capacity(&self) -> usize {
156        self.visibility.len()
157    }
158
159    pub fn selectivity(&self) -> f64 {
160        if self.visibility.is_empty() {
161            0.0
162        } else if self.visibility.all() {
163            1.0
164        } else {
165            self.visibility.count_ones() as f64 / self.visibility.len() as f64
166        }
167    }
168
169    pub fn with_visibility(&self, visibility: impl Into<Bitmap>) -> Self {
170        DataChunk {
171            columns: self.columns.clone(),
172            visibility: visibility.into(),
173        }
174    }
175
176    pub fn visibility(&self) -> &Bitmap {
177        &self.visibility
178    }
179
180    pub fn set_visibility(&mut self, visibility: Bitmap) {
181        assert_eq!(visibility.len(), self.capacity());
182        self.visibility = visibility;
183    }
184
185    /// Returns whether all rows in the chunk are visible, i.e., the chunk is compacted
186    /// in terms of row visibility.
187    pub fn is_vis_compacted(&self) -> bool {
188        self.visibility.all()
189    }
190
191    pub fn column_at(&self, idx: usize) -> &ArrayRef {
192        &self.columns[idx]
193    }
194
195    pub fn columns(&self) -> &[ArrayRef] {
196        &self.columns
197    }
198
199    /// Returns the data types of all columns.
200    pub fn data_types(&self) -> Vec<DataType> {
201        self.columns.iter().map(|col| col.data_type()).collect()
202    }
203
204    /// Divides one chunk into two at an column index.
205    ///
206    /// # Panics
207    ///
208    /// Panics if `idx > columns.len()`.
209    pub fn split_column_at(&self, idx: usize) -> (Self, Self) {
210        let (left, right) = self.columns.split_at(idx);
211        let left = DataChunk::new(left.to_vec(), self.visibility.clone());
212        let right = DataChunk::new(right.to_vec(), self.visibility.clone());
213        (left, right)
214    }
215
216    pub fn to_protobuf(&self) -> PbDataChunk {
217        assert!(self.visibility.all(), "must be compacted before transfer");
218        let mut proto = PbDataChunk {
219            cardinality: self.cardinality() as u32,
220            columns: Default::default(),
221        };
222        let column_ref = &mut proto.columns;
223        for array in &*self.columns {
224            column_ref.push(array.to_protobuf());
225        }
226        proto
227    }
228
229    /// Removes the invisible rows based on `visibility`. Returns a new compacted chunk
230    /// with all rows visible.
231    ///
232    /// `compact_vis` has trade-offs:
233    ///
234    /// Cost:
235    /// It has to rebuild the each column, meaning it will incur cost
236    /// of copying over bytes from the original column array to the new one.
237    ///
238    /// Benefit:
239    /// The main benefit is that the data chunk is smaller, taking up less memory.
240    /// We can also save the cost of iterating over many hidden rows.
241    pub fn compact_vis(self) -> Self {
242        if self.visibility.all() {
243            return self;
244        }
245        let cardinality = self.visibility.count_ones();
246        let columns = self
247            .columns
248            .iter()
249            .map(|col| {
250                let array = col;
251                array.compact_vis(&self.visibility, cardinality).into()
252            })
253            .collect::<Vec<_>>();
254        Self::new(columns, Bitmap::ones(cardinality))
255    }
256
257    /// Scatter a compacted chunk to a new chunk with the given visibility.
258    pub fn expand_vis(self, vis: Bitmap) -> Self {
259        let mut uncompact_builders: Vec<_> = self
260            .columns
261            .iter()
262            .map(|c| c.create_builder(vis.len()))
263            .collect();
264        let mut last_u = None;
265
266        for (idx, u) in vis.iter_ones().enumerate() {
267            // pad invisible rows with NULL
268            let zeros = if let Some(last_u) = last_u {
269                u - last_u - 1
270            } else {
271                u
272            };
273            for _ in 0..zeros {
274                uncompact_builders
275                    .iter_mut()
276                    .for_each(|builder| builder.append_null());
277            }
278            uncompact_builders
279                .iter_mut()
280                .zip_eq_fast(self.columns.iter())
281                .for_each(|(builder, c)| builder.append(c.datum_at(idx)));
282            last_u = Some(u);
283        }
284        let zeros = if let Some(last_u) = last_u {
285            vis.len() - last_u - 1
286        } else {
287            vis.len()
288        };
289        for _ in 0..zeros {
290            uncompact_builders
291                .iter_mut()
292                .for_each(|builder| builder.append_null());
293        }
294        let array: Vec<_> = uncompact_builders
295            .into_iter()
296            .map(|builder| Arc::new(builder.finish()))
297            .collect();
298
299        Self::new(array, vis)
300    }
301
302    pub fn from_protobuf(proto: &PbDataChunk) -> ArrayResult<Self> {
303        let mut columns = vec![];
304        for any_col in proto.get_columns() {
305            let cardinality = proto.get_cardinality() as usize;
306            columns.push(ArrayImpl::from_protobuf(any_col, cardinality)?.into());
307        }
308
309        let chunk = DataChunk::new(columns, proto.cardinality as usize);
310        Ok(chunk)
311    }
312
313    /// `rechunk` creates a new vector of data chunk whose size is `each_size_limit`.
314    /// When the total cardinality of all the chunks is not evenly divided by the `each_size_limit`,
315    /// the last new chunk will be the remainder.
316    pub fn rechunk(chunks: &[DataChunk], each_size_limit: usize) -> ArrayResult<Vec<DataChunk>> {
317        let Some(data_types) = chunks.first().map(|c| c.data_types()) else {
318            return Ok(Vec::new());
319        };
320
321        let mut builder = DataChunkBuilder::new(data_types, each_size_limit);
322        let mut outputs = Vec::new();
323
324        for chunk in chunks {
325            for output in builder.append_chunk(chunk.clone()) {
326                outputs.push(output);
327            }
328        }
329        if let Some(output) = builder.consume_all() {
330            outputs.push(output);
331        }
332
333        Ok(outputs)
334    }
335
336    /// Compute hash values for each row. The number of the returning `HashCodes` is `self.capacity()`.
337    /// When `skip_invisible_row` is true, the `HashCode` for the invisible rows is arbitrary.
338    pub fn get_hash_values<H: BuildHasher>(
339        &self,
340        column_idxes: &[usize],
341        hasher_builder: H,
342    ) -> Vec<HashCode<H>> {
343        let len = self.capacity();
344        let mut states = Vec::with_capacity(len);
345        states.resize_with(len, || hasher_builder.build_hasher());
346        // Compute hash for the specified columns.
347        for column_idx in column_idxes {
348            let array = self.column_at(*column_idx);
349            array.hash_vec(&mut states[..], self.visibility());
350        }
351        finalize_hashers(&states[..])
352            .into_iter()
353            .map(|hash_code| hash_code.into())
354            .collect_vec()
355    }
356
357    /// Random access a tuple in a data chunk. Return in a row format.
358    /// # Arguments
359    /// * `pos` - Index of look up tuple
360    /// * `RowRef` - Reference of data tuple
361    /// * bool - whether this tuple is visible
362    pub fn row_at(&self, pos: usize) -> (RowRef<'_>, bool) {
363        let row = self.row_at_unchecked_vis(pos);
364        let vis = self.visibility.is_set(pos);
365        (row, vis)
366    }
367
368    /// Random access a tuple in a data chunk. Return in a row format.
369    /// Note that this function do not return whether the row is visible.
370    /// # Arguments
371    /// * `pos` - Index of look up tuple
372    pub fn row_at_unchecked_vis(&self, pos: usize) -> RowRef<'_> {
373        RowRef::new(self, pos)
374    }
375
376    /// Returns a table-like text representation of the `DataChunk`.
377    pub fn to_pretty(&self) -> impl Display + use<> {
378        use comfy_table::Table;
379
380        if self.cardinality() == 0 {
381            return Either::Left("(empty)");
382        }
383
384        let mut table = Table::new();
385        table.load_preset(Self::PRETTY_TABLE_PRESET);
386
387        for row in self.rows() {
388            let cells: Vec<_> = row
389                .iter()
390                .map(|v| {
391                    match v {
392                        None => "".to_owned(), // NULL
393                        Some(scalar) => scalar.to_text(),
394                    }
395                })
396                .collect();
397            table.add_row(cells);
398        }
399
400        Either::Right(table)
401    }
402
403    /// Keep the specified columns and set the rest elements to null.
404    ///
405    /// # Example
406    ///
407    /// ```text
408    /// i i i                            i i i
409    /// 1 2 3  --> keep_columns([1]) --> . 2 .
410    /// 4 5 6                            . 5 .
411    /// ```
412    pub fn keep_columns(&self, column_indices: &[usize]) -> Self {
413        let capacity: usize = self.capacity();
414        let columns = (self.columns.iter().enumerate())
415            .map(|(i, column)| {
416                if column_indices.contains(&i) {
417                    column.clone()
418                } else {
419                    let mut builder = column.create_builder(capacity);
420                    builder.append_n(capacity, None as DatumRef<'_>);
421                    builder.finish().into()
422                }
423            })
424            .collect();
425        DataChunk {
426            columns,
427            visibility: self.visibility.clone(),
428        }
429    }
430
431    /// Reorder (and possibly remove) columns.
432    ///
433    /// e.g. if `indices` is `[2, 1, 0]`, and the chunk contains column `[a, b, c]`, then the output
434    /// will be `[c, b, a]`. If `indices` is [2, 0], then the output will be `[c, a]`.
435    /// If the input mapping is identity mapping, no reorder will be performed.
436    pub fn project(&self, indices: &[usize]) -> Self {
437        Self {
438            columns: indices.iter().map(|i| self.columns[*i].clone()).collect(),
439            visibility: self.visibility.clone(),
440        }
441    }
442
443    /// Reorder columns and set visibility.
444    pub fn project_with_vis(&self, indices: &[usize], visibility: Bitmap) -> Self {
445        assert_eq!(visibility.len(), self.capacity());
446        Self {
447            columns: indices.iter().map(|i| self.columns[*i].clone()).collect(),
448            visibility,
449        }
450    }
451
452    /// Reorder rows by indexes.
453    pub fn reorder_rows(&self, indexes: &[usize]) -> Self {
454        let mut array_builders: Vec<ArrayBuilderImpl> = self
455            .columns
456            .iter()
457            .map(|col| col.create_builder(indexes.len()))
458            .collect();
459        for &i in indexes {
460            for (builder, col) in array_builders.iter_mut().zip_eq_fast(self.columns.iter()) {
461                builder.append(col.value_at(i));
462            }
463        }
464        let columns = array_builders
465            .into_iter()
466            .map(|builder| builder.finish().into())
467            .collect();
468        DataChunk::new(columns, indexes.len())
469    }
470
471    /// Partition fixed size datums and variable length ones.
472    /// ---
473    /// In some cases, we have fixed size for the entire column,
474    /// when the datatypes are fixed size or the datums are constants.
475    /// As such we can compute the size for it just once for the column.
476    ///
477    /// Otherwise, for variable sized datatypes, such as `varchar`,
478    /// we have to individually compute their sizes per row.
479    fn partition_sizes(&self) -> (usize, Vec<&ArrayRef>) {
480        let mut col_variable: Vec<&ArrayRef> = vec![];
481        let mut row_len_fixed: usize = 0;
482        for c in &*self.columns {
483            if let Some(field_len) = try_get_exact_serialize_datum_size(c) {
484                row_len_fixed += field_len;
485            } else {
486                col_variable.push(c);
487            }
488        }
489        (row_len_fixed, col_variable)
490    }
491
492    unsafe fn compute_size_of_variable_cols_in_row(
493        variable_cols: &[&ArrayRef],
494        row_idx: usize,
495    ) -> usize {
496        unsafe {
497            variable_cols
498                .iter()
499                .map(|col| estimate_serialize_datum_size(col.value_at_unchecked(row_idx)))
500                .sum::<usize>()
501        }
502    }
503
504    unsafe fn init_buffer(
505        row_len_fixed: usize,
506        variable_cols: &[&ArrayRef],
507        row_idx: usize,
508    ) -> Vec<u8> {
509        unsafe {
510            Vec::with_capacity(
511                row_len_fixed + Self::compute_size_of_variable_cols_in_row(variable_cols, row_idx),
512            )
513        }
514    }
515
516    /// Serialize each row into value encoding bytes.
517    ///
518    /// The returned vector's size is `self.capacity()` and for the invisible row will give a empty
519    /// bytes.
520    // Note(bugen): should we exclude the invisible rows in the output so that the caller won't need
521    // to handle visibility again?
522    pub fn serialize(&self) -> Vec<Bytes> {
523        let buffers = if !self.visibility.all() {
524            let rows_num = self.visibility.len();
525            let mut buffers: Vec<Vec<u8>> = vec![];
526            let (row_len_fixed, col_variable) = self.partition_sizes();
527
528            // First initialize buffer with the right size to avoid re-allocations
529            for i in 0..rows_num {
530                // SAFETY(value_at_unchecked): the idx is always in bound.
531                unsafe {
532                    if self.visibility.is_set_unchecked(i) {
533                        buffers.push(Self::init_buffer(row_len_fixed, &col_variable, i));
534                    } else {
535                        buffers.push(vec![]);
536                    }
537                }
538            }
539
540            // Then do the actual serialization
541            for c in &*self.columns {
542                assert_eq!(c.len(), rows_num);
543                for (i, buffer) in buffers.iter_mut().enumerate() {
544                    // SAFETY(value_at_unchecked): the idx is always in bound.
545                    unsafe {
546                        if self.visibility.is_set_unchecked(i) {
547                            serialize_datum_into(c.value_at_unchecked(i), buffer);
548                        }
549                    }
550                }
551            }
552            buffers
553        } else {
554            let mut buffers: Vec<Vec<u8>> = vec![];
555            let (row_len_fixed, col_variable) = self.partition_sizes();
556            for i in 0..self.visibility.len() {
557                unsafe {
558                    buffers.push(Self::init_buffer(row_len_fixed, &col_variable, i));
559                }
560            }
561            for c in &*self.columns {
562                assert_eq!(c.len(), self.visibility.len());
563                for (i, buffer) in buffers.iter_mut().enumerate() {
564                    // SAFETY(value_at_unchecked): the idx is always in bound.
565                    unsafe {
566                        serialize_datum_into(c.value_at_unchecked(i), buffer);
567                    }
568                }
569            }
570            buffers
571        };
572
573        buffers.into_iter().map(|item| item.into()).collect_vec()
574    }
575
576    /// Serialize each row into bytes with given serializer.
577    ///
578    /// This is similar to `serialize` but it uses a custom serializer. Prefer `serialize` if
579    /// possible since it might be more efficient due to columnar operations.
580    pub fn serialize_with(&self, serializer: &impl ValueRowSerializer) -> Vec<Bytes> {
581        let mut results = Vec::with_capacity(self.capacity());
582        for row in self.rows_with_holes() {
583            results.push(if let Some(row) = row {
584                serializer.serialize(row).into()
585            } else {
586                Bytes::new()
587            });
588        }
589        results
590    }
591
592    /// Estimate size of hash keys. Their indices in a row are indicated by `column_indices`.
593    /// Size here refers to the number of u8s required to store the serialized datum.
594    pub fn estimate_value_encoding_size(&self, column_indices: &[usize]) -> usize {
595        if self.capacity() == 0 {
596            0
597        } else {
598            column_indices
599                .iter()
600                .map(|idx| {
601                    let datum = self.column_at(*idx).datum_at(0);
602                    estimate_serialize_datum_size(datum)
603                })
604                .sum()
605        }
606    }
607}
608
609impl fmt::Debug for DataChunk {
610    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
611        write!(
612            f,
613            "DataChunk {{ cardinality = {}, capacity = {}, data = \n{} }}",
614            self.cardinality(),
615            self.capacity(),
616            self.to_pretty()
617        )
618    }
619}
620
621impl<'a> From<&'a StructArray> for DataChunk {
622    fn from(array: &'a StructArray) -> Self {
623        Self {
624            columns: array.fields().cloned().collect(),
625            visibility: Bitmap::ones(array.len()),
626        }
627    }
628}
629
630impl EstimateSize for DataChunk {
631    fn estimated_heap_size(&self) -> usize {
632        self.columns
633            .iter()
634            .map(|a| a.estimated_heap_size())
635            .sum::<usize>()
636            + self.visibility.estimated_heap_size()
637    }
638}
639
640/// Test utilities for [`DataChunk`].
641pub trait DataChunkTestExt {
642    /// SEED for generating data chunk.
643    const SEED: u64 = 0xFF67FEABBAEF76FF;
644
645    /// Parse a chunk from string.
646    ///
647    /// # Format
648    ///
649    /// The first line is a header indicating the column types.
650    /// The following lines indicate rows within the chunk.
651    /// Each line starts with an operation followed by values.
652    /// NULL values are represented as `.`.
653    ///
654    /// # Example
655    /// ```
656    /// use risingwave_common::array::{DataChunk, DataChunkTestExt};
657    /// let chunk = DataChunk::from_pretty(
658    ///     "I I I I      // type chars
659    ///      2 5 . .      // '.' means NULL
660    ///      2 5 2 6 D    // 'D' means deleted in visibility
661    ///      . . 4 8      // ^ comments are ignored
662    ///      . . 3 4",
663    /// );
664    ///
665    /// // type chars:
666    /// //     B: bool
667    /// //     I: i64
668    /// //     i: i32
669    /// //     F: f64
670    /// //     f: f32
671    /// //     T: str
672    /// //    TS: Timestamp
673    /// //   SRL: Serial
674    /// // <i,f>: struct
675    /// ```
676    fn from_pretty(s: &str) -> Self;
677
678    /// Insert one invisible hole after every record.
679    fn with_invisible_holes(self) -> Self
680    where
681        Self: Sized;
682
683    /// Panic if the chunk is invalid.
684    fn assert_valid(&self);
685
686    /// Generate data chunk when supplied with `chunk_size` and column data types.
687    fn gen_data_chunk(
688        chunk_offset: usize,
689        chunk_size: usize,
690        data_types: &[DataType],
691        string_properties: &VarcharProperty,
692        visibility_ratio: f64,
693    ) -> Self;
694
695    /// Generate data chunks when supplied with `chunk_size` and column data types.
696    fn gen_data_chunks(
697        num_of_chunks: usize,
698        chunk_size: usize,
699        data_types: &[DataType],
700        string_properties: &VarcharProperty,
701        visibility_ratio: f64,
702    ) -> Vec<Self>
703    where
704        Self: Sized;
705}
706
707impl DataChunkTestExt for DataChunk {
708    fn from_pretty(s: &str) -> Self {
709        use crate::types::ScalarImpl;
710        fn parse_type(s: &str) -> DataType {
711            if let Some(s) = s.strip_suffix("[]") {
712                return DataType::list(parse_type(s));
713            }
714
715            // Special logic to support Map type in `DataChunk::from_pretty`.
716            // Please refer to `src/expr/impl/src/scalar/map_filter.rs`.
717            if let Some(inner) = s.strip_prefix("map<").and_then(|s| s.strip_suffix('>')) {
718                let mut parts = inner.split(',');
719                let key_type = parts.next().expect("Key type expected");
720                let value_type = parts.next().expect("Value type expected");
721                return DataType::Map(MapType::from_kv(
722                    parse_type(key_type),
723                    parse_type(value_type),
724                ));
725            }
726
727            match s {
728                "B" => DataType::Boolean,
729                "I" => DataType::Int64,
730                "i" => DataType::Int32,
731                "F" => DataType::Float64,
732                "f" => DataType::Float32,
733                "TS" => DataType::Timestamp,
734                "TZ" => DataType::Timestamptz,
735                "T" => DataType::Varchar,
736                "SRL" => DataType::Serial,
737                "D" => DataType::Date,
738                array if array.starts_with('<') && array.ends_with('>') => {
739                    DataType::Struct(StructType::unnamed(
740                        array[1..array.len() - 1]
741                            .split(',')
742                            .map(parse_type)
743                            .collect(),
744                    ))
745                }
746                _ => todo!("unsupported type: {s:?}"),
747            }
748        }
749
750        let mut lines = s.split('\n').filter(|l| !l.trim().is_empty());
751        // initialize array builders from the first line
752        let header = lines.next().unwrap().trim();
753        let datatypes = header
754            .split_ascii_whitespace()
755            .take_while(|c| *c != "//")
756            .map(parse_type)
757            .collect::<Vec<_>>();
758        let mut array_builders = datatypes
759            .iter()
760            .map(|ty| ty.create_array_builder(1))
761            .collect::<Vec<_>>();
762        let mut visibility = vec![];
763        for line in lines {
764            let mut token = line.trim().split_ascii_whitespace();
765            // allow `zip` since `token` may longer than `array_builders`
766            #[allow(clippy::disallowed_methods)]
767            for ((builder, ty), val_str) in
768                array_builders.iter_mut().zip(&datatypes).zip(&mut token)
769            {
770                let datum = match val_str {
771                    "." => None,
772                    "(empty)" => Some("".into()),
773                    // `from_text_for_test` has support for Map.
774                    _ => Some(ScalarImpl::from_text_for_test(val_str, ty).unwrap()),
775                };
776                builder.append(datum);
777            }
778            let visible = match token.next() {
779                None | Some("//") => true,
780                Some("D") => false,
781                Some(t) => panic!("invalid token: {t:?}"),
782            };
783            visibility.push(visible);
784        }
785        let columns = array_builders
786            .into_iter()
787            .map(|builder| builder.finish().into())
788            .collect();
789        let vis = Bitmap::from_iter(visibility);
790        let chunk = DataChunk::new(columns, vis);
791        chunk.assert_valid();
792        chunk
793    }
794
795    fn with_invisible_holes(self) -> Self
796    where
797        Self: Sized,
798    {
799        let (cols, vis) = self.into_parts();
800        let n = vis.len();
801        let mut new_vis = BitmapBuilder::with_capacity(n * 2);
802        for b in vis.iter() {
803            new_vis.append(b);
804            new_vis.append(false);
805        }
806        let new_cols = cols
807            .into_iter()
808            .map(|col| {
809                let arr = col;
810                let mut builder = arr.create_builder(n * 2);
811                for v in arr.iter() {
812                    builder.append(v.to_owned_datum());
813                    builder.append_null();
814                }
815
816                builder.finish().into()
817            })
818            .collect();
819        let chunk = DataChunk::new(new_cols, new_vis.finish());
820        chunk.assert_valid();
821        chunk
822    }
823
824    fn assert_valid(&self) {
825        let cols = self.columns();
826        let vis = &self.visibility;
827        let n = vis.len();
828        for col in cols {
829            assert_eq!(col.len(), n);
830        }
831    }
832
833    fn gen_data_chunk(
834        chunk_offset: usize,
835        chunk_size: usize,
836        data_types: &[DataType],
837        varchar_properties: &VarcharProperty,
838        visibility_percent: f64,
839    ) -> Self {
840        let vis = if visibility_percent == 0.0 {
841            Bitmap::zeros(chunk_size)
842        } else if visibility_percent == 1.0 {
843            Bitmap::ones(chunk_size)
844        } else {
845            let mut rng = SmallRng::from_seed([0; 32]);
846            let mut vis_builder = BitmapBuilder::with_capacity(chunk_size);
847            for _i in 0..chunk_size {
848                vis_builder.append(rng.random_bool(visibility_percent));
849            }
850            vis_builder.finish()
851        };
852
853        let mut columns = Vec::new();
854        // Generate columns of this chunk.
855        for data_type in data_types {
856            let mut array_builder = data_type.create_array_builder(chunk_size);
857            for j in 0..chunk_size {
858                let offset = ((chunk_offset + 1) * (j + 1)) as u64;
859                match data_type {
860                    DataType::Varchar => {
861                        let datum =
862                            FieldGeneratorImpl::with_varchar(varchar_properties, Self::SEED)
863                                .generate_datum(offset);
864                        array_builder.append(&datum);
865                    }
866                    DataType::Timestamp => {
867                        let datum =
868                            FieldGeneratorImpl::with_timestamp(None, None, None, Self::SEED)
869                                .expect("create timestamp generator should succeed")
870                                .generate_datum(offset);
871                        array_builder.append(datum);
872                    }
873                    DataType::Timestamptz => {
874                        let datum =
875                            FieldGeneratorImpl::with_timestamptz(None, None, None, Self::SEED)
876                                .expect("create timestamptz generator should succeed")
877                                .generate_datum(offset);
878                        array_builder.append(datum);
879                    }
880                    _ if data_type.is_numeric() => {
881                        let mut data_gen = FieldGeneratorImpl::with_number_random(
882                            data_type.clone(),
883                            None,
884                            None,
885                            Self::SEED,
886                        )
887                        .unwrap();
888                        let datum = data_gen.generate_datum(offset);
889                        array_builder.append(datum);
890                    }
891                    _ => todo!("unsupported type: {data_type:?}"),
892                }
893            }
894            columns.push(array_builder.finish().into());
895        }
896        DataChunk::new(columns, vis)
897    }
898
899    fn gen_data_chunks(
900        num_of_chunks: usize,
901        chunk_size: usize,
902        data_types: &[DataType],
903        varchar_properties: &VarcharProperty,
904        visibility_percent: f64,
905    ) -> Vec<Self> {
906        (0..num_of_chunks)
907            .map(|i| {
908                Self::gen_data_chunk(
909                    i,
910                    chunk_size,
911                    data_types,
912                    varchar_properties,
913                    visibility_percent,
914                )
915            })
916            .collect()
917    }
918}
919
920#[cfg(test)]
921mod tests {
922    use crate::array::*;
923    use crate::row::Row;
924
925    #[test]
926    fn test_rechunk() {
927        let test_case = |num_chunks: usize, chunk_size: usize, new_chunk_size: usize| {
928            let mut chunks = vec![];
929            for chunk_idx in 0..num_chunks {
930                let mut builder = PrimitiveArrayBuilder::<i32>::new(0);
931                for i in chunk_size * chunk_idx..chunk_size * (chunk_idx + 1) {
932                    builder.append(Some(i as i32));
933                }
934                let chunk = DataChunk::new(vec![Arc::new(builder.finish().into())], chunk_size);
935                chunks.push(chunk);
936            }
937
938            let total_size = num_chunks * chunk_size;
939            let num_full_new_chunk = total_size / new_chunk_size;
940            let mut chunk_sizes = vec![new_chunk_size; num_full_new_chunk];
941            let remainder = total_size % new_chunk_size;
942            if remainder != 0 {
943                chunk_sizes.push(remainder);
944            }
945
946            let new_chunks = DataChunk::rechunk(&chunks, new_chunk_size).unwrap();
947            assert_eq!(new_chunks.len(), chunk_sizes.len());
948            // check cardinality
949            for (idx, chunk_size) in chunk_sizes.iter().enumerate() {
950                assert_eq!(*chunk_size, new_chunks[idx].capacity());
951            }
952
953            let mut chunk_idx = 0;
954            let mut cur_idx = 0;
955            for val in 0..total_size {
956                if cur_idx >= chunk_sizes[chunk_idx] {
957                    cur_idx = 0;
958                    chunk_idx += 1;
959                }
960                assert_eq!(
961                    new_chunks[chunk_idx]
962                        .column_at(0)
963                        .as_int32()
964                        .value_at(cur_idx)
965                        .unwrap(),
966                    val as i32
967                );
968                cur_idx += 1;
969            }
970        };
971
972        test_case(0, 0, 1);
973        test_case(0, 10, 1);
974        test_case(10, 0, 1);
975        test_case(1, 1, 6);
976        test_case(1, 10, 11);
977        test_case(2, 3, 6);
978        test_case(5, 5, 6);
979        test_case(10, 10, 7);
980    }
981
982    #[test]
983    fn test_chunk_iter() {
984        let num_of_columns: usize = 2;
985        let length = 5;
986        let mut columns = vec![];
987        for i in 0..num_of_columns {
988            let mut builder = PrimitiveArrayBuilder::<i32>::new(length);
989            for _ in 0..length {
990                builder.append(Some(i as i32));
991            }
992            let arr = builder.finish();
993            columns.push(Arc::new(arr.into()))
994        }
995        let chunk: DataChunk = DataChunk::new(columns, length);
996        for row in chunk.rows() {
997            for i in 0..num_of_columns {
998                let val = row.datum_at(i).unwrap();
999                assert_eq!(val.into_int32(), i as i32);
1000            }
1001        }
1002    }
1003
1004    #[test]
1005    fn test_to_pretty_string() {
1006        let chunk = DataChunk::new(
1007            vec![
1008                Arc::new(I64Array::from_iter([1, 2, 3, 4]).into()),
1009                Arc::new(I64Array::from_iter([Some(6), None, Some(7), None]).into()),
1010            ],
1011            4,
1012        );
1013        assert_eq!(
1014            chunk.to_pretty().to_string(),
1015            "\
1016+---+---+
1017| 1 | 6 |
1018| 2 |   |
1019| 3 | 7 |
1020| 4 |   |
1021+---+---+"
1022        );
1023    }
1024
1025    #[test]
1026    fn test_no_column_chunk() {
1027        let chunk = DataChunk::new_dummy(10);
1028        assert_eq!(chunk.rows().count(), 10);
1029
1030        let chunk_after_serde = DataChunk::from_protobuf(&chunk.to_protobuf()).unwrap();
1031        assert_eq!(chunk_after_serde.rows().count(), 10);
1032        assert_eq!(chunk_after_serde.cardinality(), 10);
1033    }
1034
1035    #[test]
1036    fn reorder_columns() {
1037        let chunk = DataChunk::from_pretty(
1038            "I I I
1039             2 5 1
1040             4 9 2
1041             6 9 3",
1042        );
1043        assert_eq!(
1044            chunk.project(&[2, 1, 0]),
1045            DataChunk::from_pretty(
1046                "I I I
1047                 1 5 2
1048                 2 9 4
1049                 3 9 6",
1050            )
1051        );
1052        assert_eq!(
1053            chunk.project(&[2, 0]),
1054            DataChunk::from_pretty(
1055                "I I
1056                 1 2
1057                 2 4
1058                 3 6",
1059            )
1060        );
1061        assert_eq!(chunk.project(&[0, 1, 2]), chunk);
1062        assert_eq!(chunk.project(&[]).cardinality(), 3);
1063    }
1064
1065    #[test]
1066    fn test_chunk_estimated_size() {
1067        assert_eq!(
1068            72,
1069            DataChunk::from_pretty(
1070                "I I I
1071                 1 5 2
1072                 2 9 4
1073                 3 9 6",
1074            )
1075            .estimated_heap_size()
1076        );
1077        assert_eq!(
1078            48,
1079            DataChunk::from_pretty(
1080                "I I
1081                 1 2
1082                 2 4
1083                 3 6",
1084            )
1085            .estimated_heap_size()
1086        );
1087    }
1088}