risingwave_common/array/
data_chunk.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::borrow::Cow;
16use std::fmt;
17use std::fmt::Display;
18use std::hash::BuildHasher;
19use std::sync::Arc;
20
21use bytes::Bytes;
22use either::Either;
23use itertools::Itertools;
24use rand::rngs::SmallRng;
25use rand::{Rng, SeedableRng};
26use risingwave_common_estimate_size::EstimateSize;
27use risingwave_pb::data::PbDataChunk;
28
29use super::{Array, ArrayImpl, ArrayRef, ArrayResult, StructArray};
30use crate::array::ArrayBuilderImpl;
31use crate::array::data_chunk_iter::RowRef;
32use crate::bitmap::{Bitmap, BitmapBuilder};
33use crate::field_generator::{FieldGeneratorImpl, VarcharProperty};
34use crate::hash::HashCode;
35use crate::row::Row;
36use crate::types::{DataType, DatumRef, StructType, ToOwnedDatum, ToText};
37use crate::util::chunk_coalesce::DataChunkBuilder;
38use crate::util::hash_util::finalize_hashers;
39use crate::util::iter_util::ZipEqFast;
40use crate::util::value_encoding::{
41    ValueRowSerializer, estimate_serialize_datum_size, serialize_datum_into,
42    try_get_exact_serialize_datum_size,
43};
44
45/// [`DataChunk`] is a collection of Columns,
46/// with a visibility mask for each row.
47/// For instance, we could have a [`DataChunk`] of this format.
48///
49/// | v1 | v2 | v3 |
50/// |----|----|----|
51/// | 1  | a  | t  |
52/// | 2  | b  | f  |
53/// | 3  | c  | t  |
54/// | 4  | d  | f  |
55///
56/// Our columns are v1, v2, v3.
57/// Then, if the Visibility Mask hides rows 2 and 4,
58/// We will only have these rows visible:
59///
60/// | v1 | v2 | v3 |
61/// |----|----|----|
62/// | 1  | a  | t  |
63/// | 3  | c  | t  |
64#[derive(Clone, PartialEq)]
65#[must_use]
66pub struct DataChunk {
67    columns: Arc<[ArrayRef]>,
68    visibility: Bitmap,
69}
70
71impl DataChunk {
72    pub(crate) const PRETTY_TABLE_PRESET: &'static str = "||--+-++|    ++++++";
73
74    /// Create a `DataChunk` with `columns` and visibility.
75    ///
76    /// The visibility can either be a `Bitmap` or a simple cardinality number.
77    pub fn new(columns: Vec<ArrayRef>, visibility: impl Into<Bitmap>) -> Self {
78        let visibility = visibility.into();
79        let capacity = visibility.len();
80        for column in &columns {
81            assert_eq!(capacity, column.len());
82        }
83
84        DataChunk {
85            columns: columns.into(),
86            visibility,
87        }
88    }
89
90    /// `new_dummy` creates a data chunk without columns but only a cardinality.
91    pub fn new_dummy(cardinality: usize) -> Self {
92        DataChunk {
93            columns: Arc::new([]),
94            visibility: Bitmap::ones(cardinality),
95        }
96    }
97
98    /// Build a `DataChunk` with rows.
99    ///
100    /// Panics if the `rows` is empty.
101    ///
102    /// Should prefer using [`DataChunkBuilder`] instead to avoid unnecessary allocation
103    /// of rows.
104    pub fn from_rows(rows: &[impl Row], data_types: &[DataType]) -> Self {
105        // `append_one_row` will cause the builder to finish immediately once capacity is met.
106        // Hence, we allocate an extra row here, to avoid the builder finishing prematurely.
107        // This just makes the code cleaner, since we can loop through all rows, and consume it finally.
108        // TODO: introduce `new_unlimited` to decouple memory reservation from builder capacity.
109        let mut builder = DataChunkBuilder::new(data_types.to_vec(), rows.len() + 1);
110
111        for row in rows {
112            let none = builder.append_one_row(row);
113            debug_assert!(none.is_none());
114        }
115
116        builder.consume_all().expect("chunk should not be empty")
117    }
118
119    /// Return the next visible row index on or after `row_idx`.
120    pub fn next_visible_row_idx(&self, row_idx: usize) -> Option<usize> {
121        self.visibility.next_set_bit(row_idx)
122    }
123
124    pub fn into_parts(self) -> (Vec<ArrayRef>, Bitmap) {
125        (self.columns.to_vec(), self.visibility)
126    }
127
128    pub fn into_parts_v2(self) -> (Arc<[ArrayRef]>, Bitmap) {
129        (self.columns, self.visibility)
130    }
131
132    pub fn from_parts(columns: Arc<[ArrayRef]>, visibilities: Bitmap) -> Self {
133        Self {
134            columns,
135            visibility: visibilities,
136        }
137    }
138
139    pub fn dimension(&self) -> usize {
140        self.columns.len()
141    }
142
143    // TODO(rc): shall we rename this to `visible_size`? I sometimes find this confused with `capacity`.
144    /// `cardinality` returns the number of visible tuples
145    pub fn cardinality(&self) -> usize {
146        self.visibility.count_ones()
147    }
148
149    // TODO(rc): shall we rename this to `size`?
150    /// `capacity` returns physical length of any chunk column
151    pub fn capacity(&self) -> usize {
152        self.visibility.len()
153    }
154
155    pub fn selectivity(&self) -> f64 {
156        if self.visibility.is_empty() {
157            0.0
158        } else if self.visibility.all() {
159            1.0
160        } else {
161            self.visibility.count_ones() as f64 / self.visibility.len() as f64
162        }
163    }
164
165    pub fn with_visibility(&self, visibility: impl Into<Bitmap>) -> Self {
166        DataChunk {
167            columns: self.columns.clone(),
168            visibility: visibility.into(),
169        }
170    }
171
172    pub fn visibility(&self) -> &Bitmap {
173        &self.visibility
174    }
175
176    pub fn set_visibility(&mut self, visibility: Bitmap) {
177        assert_eq!(visibility.len(), self.capacity());
178        self.visibility = visibility;
179    }
180
181    pub fn is_compacted(&self) -> bool {
182        self.visibility.all()
183    }
184
185    pub fn column_at(&self, idx: usize) -> &ArrayRef {
186        &self.columns[idx]
187    }
188
189    pub fn columns(&self) -> &[ArrayRef] {
190        &self.columns
191    }
192
193    /// Returns the data types of all columns.
194    pub fn data_types(&self) -> Vec<DataType> {
195        self.columns.iter().map(|col| col.data_type()).collect()
196    }
197
198    /// Divides one chunk into two at an column index.
199    ///
200    /// # Panics
201    ///
202    /// Panics if `idx > columns.len()`.
203    pub fn split_column_at(&self, idx: usize) -> (Self, Self) {
204        let (left, right) = self.columns.split_at(idx);
205        let left = DataChunk::new(left.to_vec(), self.visibility.clone());
206        let right = DataChunk::new(right.to_vec(), self.visibility.clone());
207        (left, right)
208    }
209
210    pub fn to_protobuf(&self) -> PbDataChunk {
211        assert!(self.visibility.all(), "must be compacted before transfer");
212        let mut proto = PbDataChunk {
213            cardinality: self.cardinality() as u32,
214            columns: Default::default(),
215        };
216        let column_ref = &mut proto.columns;
217        for array in &*self.columns {
218            column_ref.push(array.to_protobuf());
219        }
220        proto
221    }
222
223    /// `compact` will convert the chunk to compact format.
224    /// Compacting removes the hidden rows, and returns a new visibility
225    /// mask which indicates this.
226    ///
227    /// `compact` has trade-offs:
228    ///
229    /// Cost:
230    /// It has to rebuild the each column, meaning it will incur cost
231    /// of copying over bytes from the original column array to the new one.
232    ///
233    /// Benefit:
234    /// The main benefit is that the data chunk is smaller, taking up less memory.
235    /// We can also save the cost of iterating over many hidden rows.
236    pub fn compact(self) -> Self {
237        if self.visibility.all() {
238            return self;
239        }
240        let cardinality = self.visibility.count_ones();
241        let columns = self
242            .columns
243            .iter()
244            .map(|col| {
245                let array = col;
246                array.compact(&self.visibility, cardinality).into()
247            })
248            .collect::<Vec<_>>();
249        Self::new(columns, Bitmap::ones(cardinality))
250    }
251
252    /// Scatter a compacted chunk to a new chunk with the given visibility.
253    pub fn uncompact(self, vis: Bitmap) -> Self {
254        let mut uncompact_builders: Vec<_> = self
255            .columns
256            .iter()
257            .map(|c| c.create_builder(vis.len()))
258            .collect();
259        let mut last_u = None;
260
261        for (idx, u) in vis.iter_ones().enumerate() {
262            // pad invisible rows with NULL
263            let zeros = if let Some(last_u) = last_u {
264                u - last_u - 1
265            } else {
266                u
267            };
268            for _ in 0..zeros {
269                uncompact_builders
270                    .iter_mut()
271                    .for_each(|builder| builder.append_null());
272            }
273            uncompact_builders
274                .iter_mut()
275                .zip_eq_fast(self.columns.iter())
276                .for_each(|(builder, c)| builder.append(c.datum_at(idx)));
277            last_u = Some(u);
278        }
279        let zeros = if let Some(last_u) = last_u {
280            vis.len() - last_u - 1
281        } else {
282            vis.len()
283        };
284        for _ in 0..zeros {
285            uncompact_builders
286                .iter_mut()
287                .for_each(|builder| builder.append_null());
288        }
289        let array: Vec<_> = uncompact_builders
290            .into_iter()
291            .map(|builder| Arc::new(builder.finish()))
292            .collect();
293
294        Self::new(array, vis)
295    }
296
297    /// Convert the chunk to compact format.
298    ///
299    /// If the chunk is not compacted, return a new compacted chunk, otherwise return a reference to self.
300    pub fn compact_cow(&self) -> Cow<'_, Self> {
301        if self.visibility.all() {
302            return Cow::Borrowed(self);
303        }
304        let cardinality = self.visibility.count_ones();
305        let columns = self
306            .columns
307            .iter()
308            .map(|col| {
309                let array = col;
310                array.compact(&self.visibility, cardinality).into()
311            })
312            .collect::<Vec<_>>();
313        Cow::Owned(Self::new(columns, Bitmap::ones(cardinality)))
314    }
315
316    pub fn from_protobuf(proto: &PbDataChunk) -> ArrayResult<Self> {
317        let mut columns = vec![];
318        for any_col in proto.get_columns() {
319            let cardinality = proto.get_cardinality() as usize;
320            columns.push(ArrayImpl::from_protobuf(any_col, cardinality)?.into());
321        }
322
323        let chunk = DataChunk::new(columns, proto.cardinality as usize);
324        Ok(chunk)
325    }
326
327    /// `rechunk` creates a new vector of data chunk whose size is `each_size_limit`.
328    /// When the total cardinality of all the chunks is not evenly divided by the `each_size_limit`,
329    /// the last new chunk will be the remainder.
330    pub fn rechunk(chunks: &[DataChunk], each_size_limit: usize) -> ArrayResult<Vec<DataChunk>> {
331        let Some(data_types) = chunks.first().map(|c| c.data_types()) else {
332            return Ok(Vec::new());
333        };
334
335        let mut builder = DataChunkBuilder::new(data_types, each_size_limit);
336        let mut outputs = Vec::new();
337
338        for chunk in chunks {
339            for output in builder.append_chunk(chunk.clone()) {
340                outputs.push(output);
341            }
342        }
343        if let Some(output) = builder.consume_all() {
344            outputs.push(output);
345        }
346
347        Ok(outputs)
348    }
349
350    /// Compute hash values for each row. The number of the returning `HashCodes` is `self.capacity()`.
351    /// When `skip_invisible_row` is true, the `HashCode` for the invisible rows is arbitrary.
352    pub fn get_hash_values<H: BuildHasher>(
353        &self,
354        column_idxes: &[usize],
355        hasher_builder: H,
356    ) -> Vec<HashCode<H>> {
357        let len = self.capacity();
358        let mut states = Vec::with_capacity(len);
359        states.resize_with(len, || hasher_builder.build_hasher());
360        // Compute hash for the specified columns.
361        for column_idx in column_idxes {
362            let array = self.column_at(*column_idx);
363            array.hash_vec(&mut states[..], self.visibility());
364        }
365        finalize_hashers(&states[..])
366            .into_iter()
367            .map(|hash_code| hash_code.into())
368            .collect_vec()
369    }
370
371    /// Random access a tuple in a data chunk. Return in a row format.
372    /// # Arguments
373    /// * `pos` - Index of look up tuple
374    /// * `RowRef` - Reference of data tuple
375    /// * bool - whether this tuple is visible
376    pub fn row_at(&self, pos: usize) -> (RowRef<'_>, bool) {
377        let row = self.row_at_unchecked_vis(pos);
378        let vis = self.visibility.is_set(pos);
379        (row, vis)
380    }
381
382    /// Random access a tuple in a data chunk. Return in a row format.
383    /// Note that this function do not return whether the row is visible.
384    /// # Arguments
385    /// * `pos` - Index of look up tuple
386    pub fn row_at_unchecked_vis(&self, pos: usize) -> RowRef<'_> {
387        RowRef::new(self, pos)
388    }
389
390    /// Returns a table-like text representation of the `DataChunk`.
391    pub fn to_pretty(&self) -> impl Display + use<> {
392        use comfy_table::Table;
393
394        if self.cardinality() == 0 {
395            return Either::Left("(empty)");
396        }
397
398        let mut table = Table::new();
399        table.load_preset(Self::PRETTY_TABLE_PRESET);
400
401        for row in self.rows() {
402            let cells: Vec<_> = row
403                .iter()
404                .map(|v| {
405                    match v {
406                        None => "".to_owned(), // NULL
407                        Some(scalar) => scalar.to_text(),
408                    }
409                })
410                .collect();
411            table.add_row(cells);
412        }
413
414        Either::Right(table)
415    }
416
417    /// Keep the specified columns and set the rest elements to null.
418    ///
419    /// # Example
420    ///
421    /// ```text
422    /// i i i                            i i i
423    /// 1 2 3  --> keep_columns([1]) --> . 2 .
424    /// 4 5 6                            . 5 .
425    /// ```
426    pub fn keep_columns(&self, column_indices: &[usize]) -> Self {
427        let capacity: usize = self.capacity();
428        let columns = (self.columns.iter().enumerate())
429            .map(|(i, column)| {
430                if column_indices.contains(&i) {
431                    column.clone()
432                } else {
433                    let mut builder = column.create_builder(capacity);
434                    builder.append_n(capacity, None as DatumRef<'_>);
435                    builder.finish().into()
436                }
437            })
438            .collect();
439        DataChunk {
440            columns,
441            visibility: self.visibility.clone(),
442        }
443    }
444
445    /// Reorder (and possibly remove) columns.
446    ///
447    /// e.g. if `indices` is `[2, 1, 0]`, and the chunk contains column `[a, b, c]`, then the output
448    /// will be `[c, b, a]`. If `indices` is [2, 0], then the output will be `[c, a]`.
449    /// If the input mapping is identity mapping, no reorder will be performed.
450    pub fn project(&self, indices: &[usize]) -> Self {
451        Self {
452            columns: indices.iter().map(|i| self.columns[*i].clone()).collect(),
453            visibility: self.visibility.clone(),
454        }
455    }
456
457    /// Reorder columns and set visibility.
458    pub fn project_with_vis(&self, indices: &[usize], visibility: Bitmap) -> Self {
459        assert_eq!(visibility.len(), self.capacity());
460        Self {
461            columns: indices.iter().map(|i| self.columns[*i].clone()).collect(),
462            visibility,
463        }
464    }
465
466    /// Reorder rows by indexes.
467    pub fn reorder_rows(&self, indexes: &[usize]) -> Self {
468        let mut array_builders: Vec<ArrayBuilderImpl> = self
469            .columns
470            .iter()
471            .map(|col| col.create_builder(indexes.len()))
472            .collect();
473        for &i in indexes {
474            for (builder, col) in array_builders.iter_mut().zip_eq_fast(self.columns.iter()) {
475                builder.append(col.value_at(i));
476            }
477        }
478        let columns = array_builders
479            .into_iter()
480            .map(|builder| builder.finish().into())
481            .collect();
482        DataChunk::new(columns, indexes.len())
483    }
484
485    /// Partition fixed size datums and variable length ones.
486    /// ---
487    /// In some cases, we have fixed size for the entire column,
488    /// when the datatypes are fixed size or the datums are constants.
489    /// As such we can compute the size for it just once for the column.
490    ///
491    /// Otherwise, for variable sized datatypes, such as `varchar`,
492    /// we have to individually compute their sizes per row.
493    fn partition_sizes(&self) -> (usize, Vec<&ArrayRef>) {
494        let mut col_variable: Vec<&ArrayRef> = vec![];
495        let mut row_len_fixed: usize = 0;
496        for c in &*self.columns {
497            if let Some(field_len) = try_get_exact_serialize_datum_size(c) {
498                row_len_fixed += field_len;
499            } else {
500                col_variable.push(c);
501            }
502        }
503        (row_len_fixed, col_variable)
504    }
505
506    unsafe fn compute_size_of_variable_cols_in_row(
507        variable_cols: &[&ArrayRef],
508        row_idx: usize,
509    ) -> usize {
510        unsafe {
511            variable_cols
512                .iter()
513                .map(|col| estimate_serialize_datum_size(col.value_at_unchecked(row_idx)))
514                .sum::<usize>()
515        }
516    }
517
518    unsafe fn init_buffer(
519        row_len_fixed: usize,
520        variable_cols: &[&ArrayRef],
521        row_idx: usize,
522    ) -> Vec<u8> {
523        unsafe {
524            Vec::with_capacity(
525                row_len_fixed + Self::compute_size_of_variable_cols_in_row(variable_cols, row_idx),
526            )
527        }
528    }
529
530    /// Serialize each row into value encoding bytes.
531    ///
532    /// The returned vector's size is `self.capacity()` and for the invisible row will give a empty
533    /// bytes.
534    // Note(bugen): should we exclude the invisible rows in the output so that the caller won't need
535    // to handle visibility again?
536    pub fn serialize(&self) -> Vec<Bytes> {
537        let buffers = if !self.visibility.all() {
538            let rows_num = self.visibility.len();
539            let mut buffers: Vec<Vec<u8>> = vec![];
540            let (row_len_fixed, col_variable) = self.partition_sizes();
541
542            // First initialize buffer with the right size to avoid re-allocations
543            for i in 0..rows_num {
544                // SAFETY(value_at_unchecked): the idx is always in bound.
545                unsafe {
546                    if self.visibility.is_set_unchecked(i) {
547                        buffers.push(Self::init_buffer(row_len_fixed, &col_variable, i));
548                    } else {
549                        buffers.push(vec![]);
550                    }
551                }
552            }
553
554            // Then do the actual serialization
555            for c in &*self.columns {
556                assert_eq!(c.len(), rows_num);
557                for (i, buffer) in buffers.iter_mut().enumerate() {
558                    // SAFETY(value_at_unchecked): the idx is always in bound.
559                    unsafe {
560                        if self.visibility.is_set_unchecked(i) {
561                            serialize_datum_into(c.value_at_unchecked(i), buffer);
562                        }
563                    }
564                }
565            }
566            buffers
567        } else {
568            let mut buffers: Vec<Vec<u8>> = vec![];
569            let (row_len_fixed, col_variable) = self.partition_sizes();
570            for i in 0..self.visibility.len() {
571                unsafe {
572                    buffers.push(Self::init_buffer(row_len_fixed, &col_variable, i));
573                }
574            }
575            for c in &*self.columns {
576                assert_eq!(c.len(), self.visibility.len());
577                for (i, buffer) in buffers.iter_mut().enumerate() {
578                    // SAFETY(value_at_unchecked): the idx is always in bound.
579                    unsafe {
580                        serialize_datum_into(c.value_at_unchecked(i), buffer);
581                    }
582                }
583            }
584            buffers
585        };
586
587        buffers.into_iter().map(|item| item.into()).collect_vec()
588    }
589
590    /// Serialize each row into bytes with given serializer.
591    ///
592    /// This is similar to `serialize` but it uses a custom serializer. Prefer `serialize` if
593    /// possible since it might be more efficient due to columnar operations.
594    pub fn serialize_with(&self, serializer: &impl ValueRowSerializer) -> Vec<Bytes> {
595        let mut results = Vec::with_capacity(self.capacity());
596        for row in self.rows_with_holes() {
597            results.push(if let Some(row) = row {
598                serializer.serialize(row).into()
599            } else {
600                Bytes::new()
601            });
602        }
603        results
604    }
605
606    /// Estimate size of hash keys. Their indices in a row are indicated by `column_indices`.
607    /// Size here refers to the number of u8s required to store the serialized datum.
608    pub fn estimate_value_encoding_size(&self, column_indices: &[usize]) -> usize {
609        if self.capacity() == 0 {
610            0
611        } else {
612            column_indices
613                .iter()
614                .map(|idx| {
615                    let datum = self.column_at(*idx).datum_at(0);
616                    estimate_serialize_datum_size(datum)
617                })
618                .sum()
619        }
620    }
621}
622
623impl fmt::Debug for DataChunk {
624    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
625        write!(
626            f,
627            "DataChunk {{ cardinality = {}, capacity = {}, data = \n{} }}",
628            self.cardinality(),
629            self.capacity(),
630            self.to_pretty()
631        )
632    }
633}
634
635impl<'a> From<&'a StructArray> for DataChunk {
636    fn from(array: &'a StructArray) -> Self {
637        Self {
638            columns: array.fields().cloned().collect(),
639            visibility: Bitmap::ones(array.len()),
640        }
641    }
642}
643
644impl EstimateSize for DataChunk {
645    fn estimated_heap_size(&self) -> usize {
646        self.columns
647            .iter()
648            .map(|a| a.estimated_heap_size())
649            .sum::<usize>()
650            + self.visibility.estimated_heap_size()
651    }
652}
653
654/// Test utilities for [`DataChunk`].
655pub trait DataChunkTestExt {
656    /// SEED for generating data chunk.
657    const SEED: u64 = 0xFF67FEABBAEF76FF;
658
659    /// Parse a chunk from string.
660    ///
661    /// # Format
662    ///
663    /// The first line is a header indicating the column types.
664    /// The following lines indicate rows within the chunk.
665    /// Each line starts with an operation followed by values.
666    /// NULL values are represented as `.`.
667    ///
668    /// # Example
669    /// ```
670    /// use risingwave_common::array::{DataChunk, DataChunkTestExt};
671    /// let chunk = DataChunk::from_pretty(
672    ///     "I I I I      // type chars
673    ///      2 5 . .      // '.' means NULL
674    ///      2 5 2 6 D    // 'D' means deleted in visibility
675    ///      . . 4 8      // ^ comments are ignored
676    ///      . . 3 4",
677    /// );
678    ///
679    /// // type chars:
680    /// //     B: bool
681    /// //     I: i64
682    /// //     i: i32
683    /// //     F: f64
684    /// //     f: f32
685    /// //     T: str
686    /// //    TS: Timestamp
687    /// //   SRL: Serial
688    /// // <i,f>: struct
689    /// ```
690    fn from_pretty(s: &str) -> Self;
691
692    /// Insert one invisible hole after every record.
693    fn with_invisible_holes(self) -> Self
694    where
695        Self: Sized;
696
697    /// Panic if the chunk is invalid.
698    fn assert_valid(&self);
699
700    /// Generate data chunk when supplied with `chunk_size` and column data types.
701    fn gen_data_chunk(
702        chunk_offset: usize,
703        chunk_size: usize,
704        data_types: &[DataType],
705        string_properties: &VarcharProperty,
706        visibility_ratio: f64,
707    ) -> Self;
708
709    /// Generate data chunks when supplied with `chunk_size` and column data types.
710    fn gen_data_chunks(
711        num_of_chunks: usize,
712        chunk_size: usize,
713        data_types: &[DataType],
714        string_properties: &VarcharProperty,
715        visibility_ratio: f64,
716    ) -> Vec<Self>
717    where
718        Self: Sized;
719}
720
721impl DataChunkTestExt for DataChunk {
722    fn from_pretty(s: &str) -> Self {
723        use crate::types::ScalarImpl;
724        fn parse_type(s: &str) -> DataType {
725            if let Some(s) = s.strip_suffix("[]") {
726                return DataType::List(Box::new(parse_type(s)));
727            }
728            match s {
729                "B" => DataType::Boolean,
730                "I" => DataType::Int64,
731                "i" => DataType::Int32,
732                "F" => DataType::Float64,
733                "f" => DataType::Float32,
734                "TS" => DataType::Timestamp,
735                "TZ" => DataType::Timestamptz,
736                "T" => DataType::Varchar,
737                "SRL" => DataType::Serial,
738                "D" => DataType::Date,
739                array if array.starts_with('<') && array.ends_with('>') => {
740                    DataType::Struct(StructType::unnamed(
741                        array[1..array.len() - 1]
742                            .split(',')
743                            .map(parse_type)
744                            .collect(),
745                    ))
746                }
747                _ => todo!("unsupported type: {s:?}"),
748            }
749        }
750
751        let mut lines = s.split('\n').filter(|l| !l.trim().is_empty());
752        // initialize array builders from the first line
753        let header = lines.next().unwrap().trim();
754        let datatypes = header
755            .split_ascii_whitespace()
756            .take_while(|c| *c != "//")
757            .map(parse_type)
758            .collect::<Vec<_>>();
759        let mut array_builders = datatypes
760            .iter()
761            .map(|ty| ty.create_array_builder(1))
762            .collect::<Vec<_>>();
763        let mut visibility = vec![];
764        for line in lines {
765            let mut token = line.trim().split_ascii_whitespace();
766            // allow `zip` since `token` may longer than `array_builders`
767            #[allow(clippy::disallowed_methods)]
768            for ((builder, ty), val_str) in
769                array_builders.iter_mut().zip(&datatypes).zip(&mut token)
770            {
771                let datum = match val_str {
772                    "." => None,
773                    "(empty)" => Some("".into()),
774                    _ => Some(ScalarImpl::from_text(val_str, ty).unwrap()),
775                };
776                builder.append(datum);
777            }
778            let visible = match token.next() {
779                None | Some("//") => true,
780                Some("D") => false,
781                Some(t) => panic!("invalid token: {t:?}"),
782            };
783            visibility.push(visible);
784        }
785        let columns = array_builders
786            .into_iter()
787            .map(|builder| builder.finish().into())
788            .collect();
789        let vis = Bitmap::from_iter(visibility);
790        let chunk = DataChunk::new(columns, vis);
791        chunk.assert_valid();
792        chunk
793    }
794
795    fn with_invisible_holes(self) -> Self
796    where
797        Self: Sized,
798    {
799        let (cols, vis) = self.into_parts();
800        let n = vis.len();
801        let mut new_vis = BitmapBuilder::with_capacity(n * 2);
802        for b in vis.iter() {
803            new_vis.append(b);
804            new_vis.append(false);
805        }
806        let new_cols = cols
807            .into_iter()
808            .map(|col| {
809                let arr = col;
810                let mut builder = arr.create_builder(n * 2);
811                for v in arr.iter() {
812                    builder.append(v.to_owned_datum());
813                    builder.append_null();
814                }
815
816                builder.finish().into()
817            })
818            .collect();
819        let chunk = DataChunk::new(new_cols, new_vis.finish());
820        chunk.assert_valid();
821        chunk
822    }
823
824    fn assert_valid(&self) {
825        let cols = self.columns();
826        let vis = &self.visibility;
827        let n = vis.len();
828        for col in cols {
829            assert_eq!(col.len(), n);
830        }
831    }
832
833    fn gen_data_chunk(
834        chunk_offset: usize,
835        chunk_size: usize,
836        data_types: &[DataType],
837        varchar_properties: &VarcharProperty,
838        visibility_percent: f64,
839    ) -> Self {
840        let vis = if visibility_percent == 0.0 {
841            Bitmap::zeros(chunk_size)
842        } else if visibility_percent == 1.0 {
843            Bitmap::ones(chunk_size)
844        } else {
845            let mut rng = SmallRng::from_seed([0; 32]);
846            let mut vis_builder = BitmapBuilder::with_capacity(chunk_size);
847            for _i in 0..chunk_size {
848                vis_builder.append(rng.random_bool(visibility_percent));
849            }
850            vis_builder.finish()
851        };
852
853        let mut columns = Vec::new();
854        // Generate columns of this chunk.
855        for data_type in data_types {
856            let mut array_builder = data_type.create_array_builder(chunk_size);
857            for j in 0..chunk_size {
858                let offset = ((chunk_offset + 1) * (j + 1)) as u64;
859                match data_type {
860                    DataType::Varchar => {
861                        let datum =
862                            FieldGeneratorImpl::with_varchar(varchar_properties, Self::SEED)
863                                .generate_datum(offset);
864                        array_builder.append(&datum);
865                    }
866                    DataType::Timestamp => {
867                        let datum =
868                            FieldGeneratorImpl::with_timestamp(None, None, None, Self::SEED)
869                                .expect("create timestamp generator should succeed")
870                                .generate_datum(offset);
871                        array_builder.append(datum);
872                    }
873                    DataType::Timestamptz => {
874                        let datum =
875                            FieldGeneratorImpl::with_timestamptz(None, None, None, Self::SEED)
876                                .expect("create timestamptz generator should succeed")
877                                .generate_datum(offset);
878                        array_builder.append(datum);
879                    }
880                    _ if data_type.is_numeric() => {
881                        let mut data_gen = FieldGeneratorImpl::with_number_random(
882                            data_type.clone(),
883                            None,
884                            None,
885                            Self::SEED,
886                        )
887                        .unwrap();
888                        let datum = data_gen.generate_datum(offset);
889                        array_builder.append(datum);
890                    }
891                    _ => todo!("unsupported type: {data_type:?}"),
892                }
893            }
894            columns.push(array_builder.finish().into());
895        }
896        DataChunk::new(columns, vis)
897    }
898
899    fn gen_data_chunks(
900        num_of_chunks: usize,
901        chunk_size: usize,
902        data_types: &[DataType],
903        varchar_properties: &VarcharProperty,
904        visibility_percent: f64,
905    ) -> Vec<Self> {
906        (0..num_of_chunks)
907            .map(|i| {
908                Self::gen_data_chunk(
909                    i,
910                    chunk_size,
911                    data_types,
912                    varchar_properties,
913                    visibility_percent,
914                )
915            })
916            .collect()
917    }
918}
919
920#[cfg(test)]
921mod tests {
922    use crate::array::*;
923    use crate::row::Row;
924
925    #[test]
926    fn test_rechunk() {
927        let test_case = |num_chunks: usize, chunk_size: usize, new_chunk_size: usize| {
928            let mut chunks = vec![];
929            for chunk_idx in 0..num_chunks {
930                let mut builder = PrimitiveArrayBuilder::<i32>::new(0);
931                for i in chunk_size * chunk_idx..chunk_size * (chunk_idx + 1) {
932                    builder.append(Some(i as i32));
933                }
934                let chunk = DataChunk::new(vec![Arc::new(builder.finish().into())], chunk_size);
935                chunks.push(chunk);
936            }
937
938            let total_size = num_chunks * chunk_size;
939            let num_full_new_chunk = total_size / new_chunk_size;
940            let mut chunk_sizes = vec![new_chunk_size; num_full_new_chunk];
941            let remainder = total_size % new_chunk_size;
942            if remainder != 0 {
943                chunk_sizes.push(remainder);
944            }
945
946            let new_chunks = DataChunk::rechunk(&chunks, new_chunk_size).unwrap();
947            assert_eq!(new_chunks.len(), chunk_sizes.len());
948            // check cardinality
949            for (idx, chunk_size) in chunk_sizes.iter().enumerate() {
950                assert_eq!(*chunk_size, new_chunks[idx].capacity());
951            }
952
953            let mut chunk_idx = 0;
954            let mut cur_idx = 0;
955            for val in 0..total_size {
956                if cur_idx >= chunk_sizes[chunk_idx] {
957                    cur_idx = 0;
958                    chunk_idx += 1;
959                }
960                assert_eq!(
961                    new_chunks[chunk_idx]
962                        .column_at(0)
963                        .as_int32()
964                        .value_at(cur_idx)
965                        .unwrap(),
966                    val as i32
967                );
968                cur_idx += 1;
969            }
970        };
971
972        test_case(0, 0, 1);
973        test_case(0, 10, 1);
974        test_case(10, 0, 1);
975        test_case(1, 1, 6);
976        test_case(1, 10, 11);
977        test_case(2, 3, 6);
978        test_case(5, 5, 6);
979        test_case(10, 10, 7);
980    }
981
982    #[test]
983    fn test_chunk_iter() {
984        let num_of_columns: usize = 2;
985        let length = 5;
986        let mut columns = vec![];
987        for i in 0..num_of_columns {
988            let mut builder = PrimitiveArrayBuilder::<i32>::new(length);
989            for _ in 0..length {
990                builder.append(Some(i as i32));
991            }
992            let arr = builder.finish();
993            columns.push(Arc::new(arr.into()))
994        }
995        let chunk: DataChunk = DataChunk::new(columns, length);
996        for row in chunk.rows() {
997            for i in 0..num_of_columns {
998                let val = row.datum_at(i).unwrap();
999                assert_eq!(val.into_int32(), i as i32);
1000            }
1001        }
1002    }
1003
1004    #[test]
1005    fn test_to_pretty_string() {
1006        let chunk = DataChunk::new(
1007            vec![
1008                Arc::new(I64Array::from_iter([1, 2, 3, 4]).into()),
1009                Arc::new(I64Array::from_iter([Some(6), None, Some(7), None]).into()),
1010            ],
1011            4,
1012        );
1013        assert_eq!(
1014            chunk.to_pretty().to_string(),
1015            "\
1016+---+---+
1017| 1 | 6 |
1018| 2 |   |
1019| 3 | 7 |
1020| 4 |   |
1021+---+---+"
1022        );
1023    }
1024
1025    #[test]
1026    fn test_no_column_chunk() {
1027        let chunk = DataChunk::new_dummy(10);
1028        assert_eq!(chunk.rows().count(), 10);
1029
1030        let chunk_after_serde = DataChunk::from_protobuf(&chunk.to_protobuf()).unwrap();
1031        assert_eq!(chunk_after_serde.rows().count(), 10);
1032        assert_eq!(chunk_after_serde.cardinality(), 10);
1033    }
1034
1035    #[test]
1036    fn reorder_columns() {
1037        let chunk = DataChunk::from_pretty(
1038            "I I I
1039             2 5 1
1040             4 9 2
1041             6 9 3",
1042        );
1043        assert_eq!(
1044            chunk.project(&[2, 1, 0]),
1045            DataChunk::from_pretty(
1046                "I I I
1047                 1 5 2
1048                 2 9 4
1049                 3 9 6",
1050            )
1051        );
1052        assert_eq!(
1053            chunk.project(&[2, 0]),
1054            DataChunk::from_pretty(
1055                "I I
1056                 1 2
1057                 2 4
1058                 3 6",
1059            )
1060        );
1061        assert_eq!(chunk.project(&[0, 1, 2]), chunk);
1062        assert_eq!(chunk.project(&[]).cardinality(), 3);
1063    }
1064
1065    #[test]
1066    fn test_chunk_estimated_size() {
1067        assert_eq!(
1068            72,
1069            DataChunk::from_pretty(
1070                "I I I
1071                 1 5 2
1072                 2 9 4
1073                 3 9 6",
1074            )
1075            .estimated_heap_size()
1076        );
1077        assert_eq!(
1078            48,
1079            DataChunk::from_pretty(
1080                "I I
1081                 1 2
1082                 2 4
1083                 3 6",
1084            )
1085            .estimated_heap_size()
1086        );
1087    }
1088}