risingwave_common/array/
data_chunk.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt;
16use std::fmt::Display;
17use std::hash::BuildHasher;
18use std::sync::Arc;
19
20use bytes::Bytes;
21use either::Either;
22use itertools::Itertools;
23use rand::rngs::SmallRng;
24use rand::{Rng, SeedableRng};
25use risingwave_common_estimate_size::EstimateSize;
26use risingwave_pb::data::PbDataChunk;
27
28use super::{Array, ArrayImpl, ArrayRef, ArrayResult, StructArray};
29use crate::array::ArrayBuilderImpl;
30use crate::array::data_chunk_iter::RowRef;
31use crate::bitmap::{Bitmap, BitmapBuilder};
32use crate::field_generator::{FieldGeneratorImpl, VarcharProperty};
33use crate::hash::HashCode;
34use crate::row::Row;
35use crate::types::{DataType, DatumRef, MapType, StructType, ToOwnedDatum, ToText};
36use crate::util::chunk_coalesce::DataChunkBuilder;
37use crate::util::hash_util::finalize_hashers;
38use crate::util::iter_util::ZipEqFast;
39use crate::util::value_encoding::{
40    ValueRowSerializer, estimate_serialize_datum_size, serialize_datum_into,
41    try_get_exact_serialize_datum_size,
42};
43
44/// [`DataChunk`] is a collection of Columns,
45/// with a visibility mask for each row.
46/// For instance, we could have a [`DataChunk`] of this format.
47///
48/// | v1 | v2 | v3 |
49/// |----|----|----|
50/// | 1  | a  | t  |
51/// | 2  | b  | f  |
52/// | 3  | c  | t  |
53/// | 4  | d  | f  |
54///
55/// Our columns are v1, v2, v3.
56/// Then, if the Visibility Mask hides rows 2 and 4,
57/// We will only have these rows visible:
58///
59/// | v1 | v2 | v3 |
60/// |----|----|----|
61/// | 1  | a  | t  |
62/// | 3  | c  | t  |
63#[derive(Clone, PartialEq)]
64#[must_use]
65pub struct DataChunk {
66    columns: Arc<[ArrayRef]>,
67    visibility: Bitmap,
68}
69
70impl DataChunk {
71    pub(crate) const PRETTY_TABLE_PRESET: &'static str = "||--+-++|    ++++++";
72
73    /// Create a `DataChunk` with `columns` and visibility.
74    ///
75    /// The visibility can either be a `Bitmap` or a simple cardinality number.
76    pub fn new(columns: Vec<ArrayRef>, visibility: impl Into<Bitmap>) -> Self {
77        let visibility = visibility.into();
78        let capacity = visibility.len();
79        for column in &columns {
80            assert_eq!(capacity, column.len());
81        }
82
83        DataChunk {
84            columns: columns.into(),
85            visibility,
86        }
87    }
88
89    /// `new_dummy` creates a data chunk without columns but only a cardinality.
90    pub fn new_dummy(cardinality: usize) -> Self {
91        DataChunk {
92            columns: Arc::new([]),
93            visibility: Bitmap::ones(cardinality),
94        }
95    }
96
97    /// Build a `DataChunk` with rows.
98    ///
99    /// Panics if the `rows` is empty.
100    ///
101    /// Should prefer using [`DataChunkBuilder`] instead to avoid unnecessary allocation
102    /// of rows.
103    pub fn from_rows(rows: &[impl Row], data_types: &[DataType]) -> Self {
104        // `append_one_row` will cause the builder to finish immediately once capacity is met.
105        // Hence, we allocate an extra row here, to avoid the builder finishing prematurely.
106        // This just makes the code cleaner, since we can loop through all rows, and consume it finally.
107        // TODO: introduce `new_unlimited` to decouple memory reservation from builder capacity.
108        let mut builder = DataChunkBuilder::new(data_types.to_vec(), rows.len() + 1);
109
110        for row in rows {
111            let none = builder.append_one_row(row);
112            debug_assert!(none.is_none());
113        }
114
115        builder.consume_all().expect("chunk should not be empty")
116    }
117
118    /// Return the next visible row index on or after `row_idx`.
119    pub fn next_visible_row_idx(&self, row_idx: usize) -> Option<usize> {
120        self.visibility.next_set_bit(row_idx)
121    }
122
123    pub fn into_parts(self) -> (Vec<ArrayRef>, Bitmap) {
124        (self.columns.to_vec(), self.visibility)
125    }
126
127    pub fn into_parts_v2(self) -> (Arc<[ArrayRef]>, Bitmap) {
128        (self.columns, self.visibility)
129    }
130
131    pub fn from_parts(columns: Arc<[ArrayRef]>, visibilities: Bitmap) -> Self {
132        Self {
133            columns,
134            visibility: visibilities,
135        }
136    }
137
138    pub fn dimension(&self) -> usize {
139        self.columns.len()
140    }
141
142    // TODO(rc): shall we rename this to `visible_size`? I sometimes find this confused with `capacity`.
143    /// `cardinality` returns the number of visible tuples
144    pub fn cardinality(&self) -> usize {
145        self.visibility.count_ones()
146    }
147
148    // Compute the required permits of this chunk for rate limiting.
149    pub fn rate_limit_permits(&self) -> u64 {
150        self.cardinality() as _
151    }
152
153    // TODO(rc): shall we rename this to `size`?
154    /// `capacity` returns physical length of any chunk column
155    pub fn capacity(&self) -> usize {
156        self.visibility.len()
157    }
158
159    pub fn selectivity(&self) -> f64 {
160        if self.visibility.is_empty() {
161            0.0
162        } else if self.visibility.all() {
163            1.0
164        } else {
165            self.visibility.count_ones() as f64 / self.visibility.len() as f64
166        }
167    }
168
169    pub fn with_visibility(&self, visibility: impl Into<Bitmap>) -> Self {
170        DataChunk {
171            columns: self.columns.clone(),
172            visibility: visibility.into(),
173        }
174    }
175
176    pub fn visibility(&self) -> &Bitmap {
177        &self.visibility
178    }
179
180    pub fn set_visibility(&mut self, visibility: Bitmap) {
181        assert_eq!(visibility.len(), self.capacity());
182        self.visibility = visibility;
183    }
184
185    /// Returns whether all rows in the chunk are visible, i.e., the chunk is compacted
186    /// in terms of row visibility.
187    pub fn is_vis_compacted(&self) -> bool {
188        self.visibility.all()
189    }
190
191    pub fn column_at(&self, idx: usize) -> &ArrayRef {
192        &self.columns[idx]
193    }
194
195    pub fn columns(&self) -> &[ArrayRef] {
196        &self.columns
197    }
198
199    /// Returns the data types of all columns.
200    pub fn data_types(&self) -> Vec<DataType> {
201        self.columns.iter().map(|col| col.data_type()).collect()
202    }
203
204    /// Divides one chunk into two at an column index.
205    ///
206    /// # Panics
207    ///
208    /// Panics if `idx > columns.len()`.
209    pub fn split_column_at(&self, idx: usize) -> (Self, Self) {
210        let (left, right) = self.columns.split_at(idx);
211        let left = DataChunk::new(left.to_vec(), self.visibility.clone());
212        let right = DataChunk::new(right.to_vec(), self.visibility.clone());
213        (left, right)
214    }
215
216    pub fn to_protobuf(&self) -> PbDataChunk {
217        assert!(self.visibility.all(), "must be compacted before transfer");
218        let mut proto = PbDataChunk {
219            cardinality: self.cardinality() as u32,
220            columns: Default::default(),
221        };
222        let column_ref = &mut proto.columns;
223        for array in &*self.columns {
224            column_ref.push(array.to_protobuf());
225        }
226        proto
227    }
228
229    /// Removes the invisible rows based on `visibility`. Returns a new compacted chunk
230    /// with all rows visible.
231    ///
232    /// `compact_vis` has trade-offs:
233    ///
234    /// Cost:
235    /// It has to rebuild the each column, meaning it will incur cost
236    /// of copying over bytes from the original column array to the new one.
237    ///
238    /// Benefit:
239    /// The main benefit is that the data chunk is smaller, taking up less memory.
240    /// We can also save the cost of iterating over many hidden rows.
241    pub fn compact_vis(self) -> Self {
242        if self.visibility.all() {
243            return self;
244        }
245        let cardinality = self.visibility.count_ones();
246        let columns = self
247            .columns
248            .iter()
249            .map(|col| {
250                let array = col;
251                array.compact_vis(&self.visibility, cardinality).into()
252            })
253            .collect::<Vec<_>>();
254        Self::new(columns, Bitmap::ones(cardinality))
255    }
256
257    /// Scatter a compacted chunk to a new chunk with the given visibility.
258    pub fn expand_vis(self, vis: Bitmap) -> Self {
259        let mut uncompact_builders: Vec<_> = self
260            .columns
261            .iter()
262            .map(|c| c.create_builder(vis.len()))
263            .collect();
264        let mut last_u = None;
265
266        for (idx, u) in vis.iter_ones().enumerate() {
267            // pad invisible rows with NULL
268            let zeros = if let Some(last_u) = last_u {
269                u - last_u - 1
270            } else {
271                u
272            };
273            for _ in 0..zeros {
274                uncompact_builders
275                    .iter_mut()
276                    .for_each(|builder| builder.append_null());
277            }
278            uncompact_builders
279                .iter_mut()
280                .zip_eq_fast(self.columns.iter())
281                .for_each(|(builder, c)| builder.append(c.datum_at(idx)));
282            last_u = Some(u);
283        }
284        let zeros = if let Some(last_u) = last_u {
285            vis.len() - last_u - 1
286        } else {
287            vis.len()
288        };
289        for _ in 0..zeros {
290            uncompact_builders
291                .iter_mut()
292                .for_each(|builder| builder.append_null());
293        }
294        let array: Vec<_> = uncompact_builders
295            .into_iter()
296            .map(|builder| Arc::new(builder.finish()))
297            .collect();
298
299        Self::new(array, vis)
300    }
301
302    pub fn from_protobuf(proto: &PbDataChunk) -> ArrayResult<Self> {
303        let mut columns = vec![];
304        for any_col in proto.get_columns() {
305            let cardinality = proto.get_cardinality() as usize;
306            columns.push(ArrayImpl::from_protobuf(any_col, cardinality)?.into());
307        }
308
309        let chunk = DataChunk::new(columns, proto.cardinality as usize);
310        Ok(chunk)
311    }
312
313    /// `rechunk` creates a new vector of data chunk whose size is `each_size_limit`.
314    /// When the total cardinality of all the chunks is not evenly divided by the `each_size_limit`,
315    /// the last new chunk will be the remainder.
316    pub fn rechunk(chunks: &[DataChunk], each_size_limit: usize) -> ArrayResult<Vec<DataChunk>> {
317        let Some(data_types) = chunks.first().map(|c| c.data_types()) else {
318            return Ok(Vec::new());
319        };
320
321        let mut builder = DataChunkBuilder::new(data_types, each_size_limit);
322        let mut outputs = Vec::new();
323
324        for chunk in chunks {
325            for output in builder.append_chunk(chunk.clone()) {
326                outputs.push(output);
327            }
328        }
329        if let Some(output) = builder.consume_all() {
330            outputs.push(output);
331        }
332
333        Ok(outputs)
334    }
335
336    /// Compute hash values for each row. The number of the returning `HashCodes` is `self.capacity()`.
337    /// When `skip_invisible_row` is true, the `HashCode` for the invisible rows is arbitrary.
338    pub fn get_hash_values<H: BuildHasher>(
339        &self,
340        column_idxes: &[usize],
341        hasher_builder: H,
342    ) -> Vec<HashCode<H>> {
343        let len = self.capacity();
344        let mut states = Vec::with_capacity(len);
345        states.resize_with(len, || hasher_builder.build_hasher());
346        // Compute hash for the specified columns.
347        for column_idx in column_idxes {
348            let array = self.column_at(*column_idx);
349            array.hash_vec(&mut states[..], self.visibility());
350        }
351        finalize_hashers(&states[..])
352            .into_iter()
353            .map(|hash_code| hash_code.into())
354            .collect_vec()
355    }
356
357    /// Random access a tuple in a data chunk. Return in a row format.
358    /// # Arguments
359    /// * `pos` - Index of look up tuple
360    /// * `RowRef` - Reference of data tuple
361    /// * bool - whether this tuple is visible
362    pub fn row_at(&self, pos: usize) -> (RowRef<'_>, bool) {
363        let row = self.row_at_unchecked_vis(pos);
364        let vis = self.visibility.is_set(pos);
365        (row, vis)
366    }
367
368    /// Random access a tuple in a data chunk. Return in a row format.
369    /// Note that this function do not return whether the row is visible.
370    /// # Arguments
371    /// * `pos` - Index of look up tuple
372    pub fn row_at_unchecked_vis(&self, pos: usize) -> RowRef<'_> {
373        RowRef::new(self, pos)
374    }
375
376    /// Returns a table-like text representation of the `DataChunk`.
377    pub fn to_pretty(&self) -> impl Display + use<> {
378        use comfy_table::Table;
379
380        if self.cardinality() == 0 {
381            return Either::Left("(empty)");
382        }
383
384        let mut table = Table::new();
385        table.load_preset(Self::PRETTY_TABLE_PRESET);
386
387        for row in self.rows() {
388            let cells: Vec<_> = row
389                .iter()
390                .map(|v| {
391                    match v {
392                        None => "".to_owned(), // NULL
393                        Some(scalar) => scalar.to_text(),
394                    }
395                })
396                .collect();
397            table.add_row(cells);
398        }
399
400        Either::Right(table)
401    }
402
403    /// Keep the specified columns and set the rest elements to null.
404    ///
405    /// # Example
406    ///
407    /// ```text
408    /// i i i                            i i i
409    /// 1 2 3  --> keep_columns([1]) --> . 2 .
410    /// 4 5 6                            . 5 .
411    /// ```
412    pub fn keep_columns(&self, column_indices: &[usize]) -> Self {
413        let capacity: usize = self.capacity();
414        let columns = (self.columns.iter().enumerate())
415            .map(|(i, column)| {
416                if column_indices.contains(&i) {
417                    column.clone()
418                } else {
419                    let mut builder = column.create_builder(capacity);
420                    builder.append_n(capacity, None as DatumRef<'_>);
421                    builder.finish().into()
422                }
423            })
424            .collect();
425        DataChunk {
426            columns,
427            visibility: self.visibility.clone(),
428        }
429    }
430
431    /// Reorder (and possibly remove) columns.
432    ///
433    /// e.g. if `indices` is `[2, 1, 0]`, and the chunk contains column `[a, b, c]`, then the output
434    /// will be `[c, b, a]`. If `indices` is [2, 0], then the output will be `[c, a]`.
435    /// If the input mapping is identity mapping, no reorder will be performed.
436    pub fn project(&self, indices: &[usize]) -> Self {
437        Self {
438            columns: indices.iter().map(|i| self.columns[*i].clone()).collect(),
439            visibility: self.visibility.clone(),
440        }
441    }
442
443    /// Reorder columns and set visibility.
444    pub fn project_with_vis(&self, indices: &[usize], visibility: Bitmap) -> Self {
445        assert_eq!(visibility.len(), self.capacity());
446        Self {
447            columns: indices.iter().map(|i| self.columns[*i].clone()).collect(),
448            visibility,
449        }
450    }
451
452    /// Reorder rows by indexes.
453    pub fn reorder_rows(&self, indexes: &[usize]) -> Self {
454        let mut array_builders: Vec<ArrayBuilderImpl> = self
455            .columns
456            .iter()
457            .map(|col| col.create_builder(indexes.len()))
458            .collect();
459        for &i in indexes {
460            for (builder, col) in array_builders.iter_mut().zip_eq_fast(self.columns.iter()) {
461                builder.append(col.value_at(i));
462            }
463        }
464        let columns = array_builders
465            .into_iter()
466            .map(|builder| builder.finish().into())
467            .collect();
468        DataChunk::new(columns, indexes.len())
469    }
470
471    /// Partition fixed size datums and variable length ones.
472    /// ---
473    /// In some cases, we have fixed size for the entire column,
474    /// when the datatypes are fixed size or the datums are constants.
475    /// As such we can compute the size for it just once for the column.
476    ///
477    /// Otherwise, for variable sized datatypes, such as `varchar`,
478    /// we have to individually compute their sizes per row.
479    fn partition_sizes(&self) -> (usize, Vec<&ArrayRef>) {
480        let mut col_variable: Vec<&ArrayRef> = vec![];
481        let mut row_len_fixed: usize = 0;
482        for c in &*self.columns {
483            if let Some(field_len) = try_get_exact_serialize_datum_size(c) {
484                row_len_fixed += field_len;
485            } else {
486                col_variable.push(c);
487            }
488        }
489        (row_len_fixed, col_variable)
490    }
491
492    unsafe fn compute_size_of_variable_cols_in_row(
493        variable_cols: &[&ArrayRef],
494        row_idx: usize,
495    ) -> usize {
496        unsafe {
497            variable_cols
498                .iter()
499                .map(|col| estimate_serialize_datum_size(col.value_at_unchecked(row_idx)))
500                .sum::<usize>()
501        }
502    }
503
504    unsafe fn init_buffer(
505        row_len_fixed: usize,
506        variable_cols: &[&ArrayRef],
507        row_idx: usize,
508    ) -> Vec<u8> {
509        unsafe {
510            Vec::with_capacity(
511                row_len_fixed + Self::compute_size_of_variable_cols_in_row(variable_cols, row_idx),
512            )
513        }
514    }
515
516    /// Serialize each row into value encoding bytes.
517    ///
518    /// The returned vector's size is `self.capacity()` and for the invisible row will give a empty
519    /// bytes.
520    // Note(bugen): should we exclude the invisible rows in the output so that the caller won't need
521    // to handle visibility again?
522    pub fn serialize(&self) -> Vec<Bytes> {
523        let buffers = if !self.visibility.all() {
524            let rows_num = self.visibility.len();
525            let mut buffers: Vec<Vec<u8>> = vec![];
526            let (row_len_fixed, col_variable) = self.partition_sizes();
527
528            // First initialize buffer with the right size to avoid re-allocations
529            for i in 0..rows_num {
530                // SAFETY(value_at_unchecked): the idx is always in bound.
531                unsafe {
532                    if self.visibility.is_set_unchecked(i) {
533                        buffers.push(Self::init_buffer(row_len_fixed, &col_variable, i));
534                    } else {
535                        buffers.push(vec![]);
536                    }
537                }
538            }
539
540            // Then do the actual serialization
541            for c in &*self.columns {
542                assert_eq!(c.len(), rows_num);
543                for (i, buffer) in buffers.iter_mut().enumerate() {
544                    // SAFETY(value_at_unchecked): the idx is always in bound.
545                    unsafe {
546                        if self.visibility.is_set_unchecked(i) {
547                            serialize_datum_into(c.value_at_unchecked(i), buffer);
548                        }
549                    }
550                }
551            }
552            buffers
553        } else {
554            let mut buffers: Vec<Vec<u8>> = vec![];
555            let (row_len_fixed, col_variable) = self.partition_sizes();
556            for i in 0..self.visibility.len() {
557                unsafe {
558                    buffers.push(Self::init_buffer(row_len_fixed, &col_variable, i));
559                }
560            }
561            for c in &*self.columns {
562                assert_eq!(c.len(), self.visibility.len());
563                for (i, buffer) in buffers.iter_mut().enumerate() {
564                    // SAFETY(value_at_unchecked): the idx is always in bound.
565                    unsafe {
566                        serialize_datum_into(c.value_at_unchecked(i), buffer);
567                    }
568                }
569            }
570            buffers
571        };
572
573        buffers.into_iter().map(|item| item.into()).collect_vec()
574    }
575
576    /// Serialize each row into bytes with given serializer.
577    ///
578    /// This is similar to `serialize` but it uses a custom serializer. Prefer `serialize` if
579    /// possible since it might be more efficient due to columnar operations.
580    pub fn serialize_with(&self, serializer: &impl ValueRowSerializer) -> Vec<Bytes> {
581        let mut results = Vec::with_capacity(self.capacity());
582        for row in self.rows_with_holes() {
583            results.push(if let Some(row) = row {
584                serializer.serialize(row).into()
585            } else {
586                Bytes::new()
587            });
588        }
589        results
590    }
591
592    /// Estimate size of hash keys. Their indices in a row are indicated by `column_indices`.
593    /// Size here refers to the number of u8s required to store the serialized datum.
594    pub fn estimate_value_encoding_size(&self, column_indices: &[usize]) -> usize {
595        if self.capacity() == 0 {
596            0
597        } else {
598            column_indices
599                .iter()
600                .map(|idx| {
601                    let datum = self.column_at(*idx).datum_at(0);
602                    estimate_serialize_datum_size(datum)
603                })
604                .sum()
605        }
606    }
607}
608
609impl fmt::Debug for DataChunk {
610    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
611        write!(
612            f,
613            "DataChunk {{ cardinality = {}, capacity = {}, data = \n{} }}",
614            self.cardinality(),
615            self.capacity(),
616            self.to_pretty()
617        )
618    }
619}
620
621impl<'a> From<&'a StructArray> for DataChunk {
622    fn from(array: &'a StructArray) -> Self {
623        Self {
624            columns: array.fields().cloned().collect(),
625            visibility: Bitmap::ones(array.len()),
626        }
627    }
628}
629
630impl EstimateSize for DataChunk {
631    fn estimated_heap_size(&self) -> usize {
632        self.columns
633            .iter()
634            .map(|a| a.estimated_heap_size())
635            .sum::<usize>()
636            + self.visibility.estimated_heap_size()
637    }
638}
639
640/// Test utilities for [`DataChunk`].
641pub trait DataChunkTestExt {
642    /// SEED for generating data chunk.
643    const SEED: u64 = 0xFF67FEABBAEF76FF;
644
645    /// Parse a chunk from string.
646    ///
647    /// # Format
648    ///
649    /// The first line is a header indicating the column types.
650    /// The following lines indicate rows within the chunk.
651    /// Each line starts with an operation followed by values.
652    /// NULL values are represented as `.`.
653    ///
654    /// # Example
655    /// ```
656    /// use risingwave_common::array::{DataChunk, DataChunkTestExt};
657    /// let chunk = DataChunk::from_pretty(
658    ///     "I I I I      // type chars
659    ///      2 5 . .      // '.' means NULL
660    ///      2 5 2 6 D    // 'D' means deleted in visibility
661    ///      . . 4 8      // ^ comments are ignored
662    ///      . . 3 4",
663    /// );
664    ///
665    /// // type chars:
666    /// //     B: bool
667    /// //     I: i64
668    /// //     i: i32
669    /// //     F: f64
670    /// //     f: f32
671    /// //     T: str
672    /// //    TS: Timestamp
673    /// //   SRL: Serial
674    /// // <i,f>: struct
675    /// ```
676    fn from_pretty(s: &str) -> Self;
677
678    /// Insert one invisible hole after every record.
679    fn with_invisible_holes(self) -> Self
680    where
681        Self: Sized;
682
683    /// Panic if the chunk is invalid.
684    fn assert_valid(&self);
685
686    /// Generate data chunk when supplied with `chunk_size` and column data types.
687    fn gen_data_chunk(
688        chunk_offset: usize,
689        chunk_size: usize,
690        data_types: &[DataType],
691        string_properties: &VarcharProperty,
692        visibility_ratio: f64,
693    ) -> Self;
694
695    /// Generate data chunks when supplied with `chunk_size` and column data types.
696    fn gen_data_chunks(
697        num_of_chunks: usize,
698        chunk_size: usize,
699        data_types: &[DataType],
700        string_properties: &VarcharProperty,
701        visibility_ratio: f64,
702    ) -> Vec<Self>
703    where
704        Self: Sized;
705}
706
707impl DataChunkTestExt for DataChunk {
708    fn from_pretty(s: &str) -> Self {
709        use crate::types::ScalarImpl;
710        fn parse_type(s: &str) -> DataType {
711            if let Some(s) = s.strip_suffix("[]") {
712                return DataType::list(parse_type(s));
713            }
714
715            // Special logic to support Map type in `DataChunk::from_pretty`.
716            // Please refer to `src/expr/impl/src/scalar/map_filter.rs`.
717            if let Some(inner) = s.strip_prefix("map<").and_then(|s| s.strip_suffix('>')) {
718                let mut parts = inner.split(',');
719                let key_type = parts.next().expect("Key type expected");
720                let value_type = parts.next().expect("Value type expected");
721                return DataType::Map(MapType::from_kv(
722                    parse_type(key_type),
723                    parse_type(value_type),
724                ));
725            }
726
727            match s {
728                "B" => DataType::Boolean,
729                "I" => DataType::Int64,
730                "i" => DataType::Int32,
731                "F" => DataType::Float64,
732                "f" => DataType::Float32,
733                "TS" => DataType::Timestamp,
734                "TZ" => DataType::Timestamptz,
735                "T" => DataType::Varchar,
736                "SRL" => DataType::Serial,
737                "D" => DataType::Date,
738                array if array.starts_with('<') && array.ends_with('>') => DataType::Struct(
739                    StructType::unnamed(array[1..array.len() - 1].split(',').map(parse_type)),
740                ),
741                _ => todo!("unsupported type: {s:?}"),
742            }
743        }
744
745        let mut lines = s.split('\n').filter(|l| !l.trim().is_empty());
746        // initialize array builders from the first line
747        let header = lines.next().unwrap().trim();
748        let datatypes = header
749            .split_ascii_whitespace()
750            .take_while(|c| *c != "//")
751            .map(parse_type)
752            .collect::<Vec<_>>();
753        let mut array_builders = datatypes
754            .iter()
755            .map(|ty| ty.create_array_builder(1))
756            .collect::<Vec<_>>();
757        let mut visibility = vec![];
758        for line in lines {
759            let mut token = line.trim().split_ascii_whitespace();
760            // allow `zip` since `token` may longer than `array_builders`
761            #[allow(clippy::disallowed_methods)]
762            for ((builder, ty), val_str) in
763                array_builders.iter_mut().zip(&datatypes).zip(&mut token)
764            {
765                let datum = match val_str {
766                    "." => None,
767                    "(empty)" => Some("".into()),
768                    // `from_text_for_test` has support for Map.
769                    _ => Some(ScalarImpl::from_text_for_test(val_str, ty).unwrap()),
770                };
771                builder.append(datum);
772            }
773            let visible = match token.next() {
774                None | Some("//") => true,
775                Some("D") => false,
776                Some(t) => panic!("invalid token: {t:?}"),
777            };
778            visibility.push(visible);
779        }
780        let columns = array_builders
781            .into_iter()
782            .map(|builder| builder.finish().into())
783            .collect();
784        let vis = Bitmap::from_iter(visibility);
785        let chunk = DataChunk::new(columns, vis);
786        chunk.assert_valid();
787        chunk
788    }
789
790    fn with_invisible_holes(self) -> Self
791    where
792        Self: Sized,
793    {
794        let (cols, vis) = self.into_parts();
795        let n = vis.len();
796        let mut new_vis = BitmapBuilder::with_capacity(n * 2);
797        for b in vis.iter() {
798            new_vis.append(b);
799            new_vis.append(false);
800        }
801        let new_cols = cols
802            .into_iter()
803            .map(|col| {
804                let arr = col;
805                let mut builder = arr.create_builder(n * 2);
806                for v in arr.iter() {
807                    builder.append(v.to_owned_datum());
808                    builder.append_null();
809                }
810
811                builder.finish().into()
812            })
813            .collect();
814        let chunk = DataChunk::new(new_cols, new_vis.finish());
815        chunk.assert_valid();
816        chunk
817    }
818
819    fn assert_valid(&self) {
820        let cols = self.columns();
821        let vis = &self.visibility;
822        let n = vis.len();
823        for col in cols {
824            assert_eq!(col.len(), n);
825        }
826    }
827
828    fn gen_data_chunk(
829        chunk_offset: usize,
830        chunk_size: usize,
831        data_types: &[DataType],
832        varchar_properties: &VarcharProperty,
833        visibility_percent: f64,
834    ) -> Self {
835        let vis = if visibility_percent == 0.0 {
836            Bitmap::zeros(chunk_size)
837        } else if visibility_percent == 1.0 {
838            Bitmap::ones(chunk_size)
839        } else {
840            let mut rng = SmallRng::from_seed([0; 32]);
841            let mut vis_builder = BitmapBuilder::with_capacity(chunk_size);
842            for _i in 0..chunk_size {
843                vis_builder.append(rng.random_bool(visibility_percent));
844            }
845            vis_builder.finish()
846        };
847
848        let mut columns = Vec::new();
849        // Generate columns of this chunk.
850        for data_type in data_types {
851            let mut array_builder = data_type.create_array_builder(chunk_size);
852            for j in 0..chunk_size {
853                let offset = ((chunk_offset + 1) * (j + 1)) as u64;
854                match data_type {
855                    DataType::Varchar => {
856                        let datum =
857                            FieldGeneratorImpl::with_varchar(varchar_properties, Self::SEED)
858                                .generate_datum(offset);
859                        array_builder.append(&datum);
860                    }
861                    DataType::Timestamp => {
862                        let datum =
863                            FieldGeneratorImpl::with_timestamp(None, None, None, Self::SEED)
864                                .expect("create timestamp generator should succeed")
865                                .generate_datum(offset);
866                        array_builder.append(datum);
867                    }
868                    DataType::Timestamptz => {
869                        let datum =
870                            FieldGeneratorImpl::with_timestamptz(None, None, None, Self::SEED)
871                                .expect("create timestamptz generator should succeed")
872                                .generate_datum(offset);
873                        array_builder.append(datum);
874                    }
875                    _ if data_type.is_numeric() => {
876                        let mut data_gen = FieldGeneratorImpl::with_number_random(
877                            data_type.clone(),
878                            None,
879                            None,
880                            Self::SEED,
881                        )
882                        .unwrap();
883                        let datum = data_gen.generate_datum(offset);
884                        array_builder.append(datum);
885                    }
886                    _ => todo!("unsupported type: {data_type:?}"),
887                }
888            }
889            columns.push(array_builder.finish().into());
890        }
891        DataChunk::new(columns, vis)
892    }
893
894    fn gen_data_chunks(
895        num_of_chunks: usize,
896        chunk_size: usize,
897        data_types: &[DataType],
898        varchar_properties: &VarcharProperty,
899        visibility_percent: f64,
900    ) -> Vec<Self> {
901        (0..num_of_chunks)
902            .map(|i| {
903                Self::gen_data_chunk(
904                    i,
905                    chunk_size,
906                    data_types,
907                    varchar_properties,
908                    visibility_percent,
909                )
910            })
911            .collect()
912    }
913}
914
915#[cfg(test)]
916mod tests {
917    use crate::array::*;
918    use crate::row::Row;
919
920    #[test]
921    fn test_rechunk() {
922        let test_case = |num_chunks: usize, chunk_size: usize, new_chunk_size: usize| {
923            let mut chunks = vec![];
924            for chunk_idx in 0..num_chunks {
925                let mut builder = PrimitiveArrayBuilder::<i32>::new(0);
926                for i in chunk_size * chunk_idx..chunk_size * (chunk_idx + 1) {
927                    builder.append(Some(i as i32));
928                }
929                let chunk = DataChunk::new(vec![Arc::new(builder.finish().into())], chunk_size);
930                chunks.push(chunk);
931            }
932
933            let total_size = num_chunks * chunk_size;
934            let num_full_new_chunk = total_size / new_chunk_size;
935            let mut chunk_sizes = vec![new_chunk_size; num_full_new_chunk];
936            let remainder = total_size % new_chunk_size;
937            if remainder != 0 {
938                chunk_sizes.push(remainder);
939            }
940
941            let new_chunks = DataChunk::rechunk(&chunks, new_chunk_size).unwrap();
942            assert_eq!(new_chunks.len(), chunk_sizes.len());
943            // check cardinality
944            for (idx, chunk_size) in chunk_sizes.iter().enumerate() {
945                assert_eq!(*chunk_size, new_chunks[idx].capacity());
946            }
947
948            let mut chunk_idx = 0;
949            let mut cur_idx = 0;
950            for val in 0..total_size {
951                if cur_idx >= chunk_sizes[chunk_idx] {
952                    cur_idx = 0;
953                    chunk_idx += 1;
954                }
955                assert_eq!(
956                    new_chunks[chunk_idx]
957                        .column_at(0)
958                        .as_int32()
959                        .value_at(cur_idx)
960                        .unwrap(),
961                    val as i32
962                );
963                cur_idx += 1;
964            }
965        };
966
967        test_case(0, 0, 1);
968        test_case(0, 10, 1);
969        test_case(10, 0, 1);
970        test_case(1, 1, 6);
971        test_case(1, 10, 11);
972        test_case(2, 3, 6);
973        test_case(5, 5, 6);
974        test_case(10, 10, 7);
975    }
976
977    #[test]
978    fn test_chunk_iter() {
979        let num_of_columns: usize = 2;
980        let length = 5;
981        let mut columns = vec![];
982        for i in 0..num_of_columns {
983            let mut builder = PrimitiveArrayBuilder::<i32>::new(length);
984            for _ in 0..length {
985                builder.append(Some(i as i32));
986            }
987            let arr = builder.finish();
988            columns.push(Arc::new(arr.into()))
989        }
990        let chunk: DataChunk = DataChunk::new(columns, length);
991        for row in chunk.rows() {
992            for i in 0..num_of_columns {
993                let val = row.datum_at(i).unwrap();
994                assert_eq!(val.into_int32(), i as i32);
995            }
996        }
997    }
998
999    #[test]
1000    fn test_to_pretty_string() {
1001        let chunk = DataChunk::new(
1002            vec![
1003                Arc::new(I64Array::from_iter([1, 2, 3, 4]).into()),
1004                Arc::new(I64Array::from_iter([Some(6), None, Some(7), None]).into()),
1005            ],
1006            4,
1007        );
1008        assert_eq!(
1009            chunk.to_pretty().to_string(),
1010            "\
1011+---+---+
1012| 1 | 6 |
1013| 2 |   |
1014| 3 | 7 |
1015| 4 |   |
1016+---+---+"
1017        );
1018    }
1019
1020    #[test]
1021    fn test_no_column_chunk() {
1022        let chunk = DataChunk::new_dummy(10);
1023        assert_eq!(chunk.rows().count(), 10);
1024
1025        let chunk_after_serde = DataChunk::from_protobuf(&chunk.to_protobuf()).unwrap();
1026        assert_eq!(chunk_after_serde.rows().count(), 10);
1027        assert_eq!(chunk_after_serde.cardinality(), 10);
1028    }
1029
1030    #[test]
1031    fn reorder_columns() {
1032        let chunk = DataChunk::from_pretty(
1033            "I I I
1034             2 5 1
1035             4 9 2
1036             6 9 3",
1037        );
1038        assert_eq!(
1039            chunk.project(&[2, 1, 0]),
1040            DataChunk::from_pretty(
1041                "I I I
1042                 1 5 2
1043                 2 9 4
1044                 3 9 6",
1045            )
1046        );
1047        assert_eq!(
1048            chunk.project(&[2, 0]),
1049            DataChunk::from_pretty(
1050                "I I
1051                 1 2
1052                 2 4
1053                 3 6",
1054            )
1055        );
1056        assert_eq!(chunk.project(&[0, 1, 2]), chunk);
1057        assert_eq!(chunk.project(&[]).cardinality(), 3);
1058    }
1059
1060    #[test]
1061    fn test_chunk_estimated_size() {
1062        assert_eq!(
1063            72,
1064            DataChunk::from_pretty(
1065                "I I I
1066                 1 5 2
1067                 2 9 4
1068                 3 9 6",
1069            )
1070            .estimated_heap_size()
1071        );
1072        assert_eq!(
1073            48,
1074            DataChunk::from_pretty(
1075                "I I
1076                 1 2
1077                 2 4
1078                 3 6",
1079            )
1080            .estimated_heap_size()
1081        );
1082    }
1083}