1use std::fmt;
16use std::fmt::Display;
17use std::hash::BuildHasher;
18use std::sync::Arc;
19
20use bytes::Bytes;
21use either::Either;
22use itertools::Itertools;
23use rand::rngs::SmallRng;
24use rand::{Rng, SeedableRng};
25use risingwave_common_estimate_size::EstimateSize;
26use risingwave_pb::data::PbDataChunk;
27
28use super::{Array, ArrayImpl, ArrayRef, ArrayResult, StructArray};
29use crate::array::ArrayBuilderImpl;
30use crate::array::data_chunk_iter::RowRef;
31use crate::bitmap::{Bitmap, BitmapBuilder};
32use crate::field_generator::{FieldGeneratorImpl, VarcharProperty};
33use crate::hash::HashCode;
34use crate::row::Row;
35use crate::types::{DataType, DatumRef, MapType, StructType, ToOwnedDatum, ToText};
36use crate::util::chunk_coalesce::DataChunkBuilder;
37use crate::util::hash_util::finalize_hashers;
38use crate::util::iter_util::ZipEqFast;
39use crate::util::value_encoding::{
40 ValueRowSerializer, estimate_serialize_datum_size, serialize_datum_into,
41 try_get_exact_serialize_datum_size,
42};
43
44#[derive(Clone, PartialEq)]
64#[must_use]
65pub struct DataChunk {
66 columns: Arc<[ArrayRef]>,
67 visibility: Bitmap,
68}
69
70impl DataChunk {
71 pub(crate) const PRETTY_TABLE_PRESET: &'static str = "||--+-++| ++++++";
72
73 pub fn new(columns: Vec<ArrayRef>, visibility: impl Into<Bitmap>) -> Self {
77 let visibility = visibility.into();
78 let capacity = visibility.len();
79 for column in &columns {
80 assert_eq!(capacity, column.len());
81 }
82
83 DataChunk {
84 columns: columns.into(),
85 visibility,
86 }
87 }
88
89 pub fn new_dummy(cardinality: usize) -> Self {
91 DataChunk {
92 columns: Arc::new([]),
93 visibility: Bitmap::ones(cardinality),
94 }
95 }
96
97 pub fn from_rows(rows: &[impl Row], data_types: &[DataType]) -> Self {
104 let mut builder = DataChunkBuilder::new(data_types.to_vec(), rows.len() + 1);
109
110 for row in rows {
111 let none = builder.append_one_row(row);
112 debug_assert!(none.is_none());
113 }
114
115 builder.consume_all().expect("chunk should not be empty")
116 }
117
118 pub fn next_visible_row_idx(&self, row_idx: usize) -> Option<usize> {
120 self.visibility.next_set_bit(row_idx)
121 }
122
123 pub fn into_parts(self) -> (Vec<ArrayRef>, Bitmap) {
124 (self.columns.to_vec(), self.visibility)
125 }
126
127 pub fn into_parts_v2(self) -> (Arc<[ArrayRef]>, Bitmap) {
128 (self.columns, self.visibility)
129 }
130
131 pub fn from_parts(columns: Arc<[ArrayRef]>, visibilities: Bitmap) -> Self {
132 Self {
133 columns,
134 visibility: visibilities,
135 }
136 }
137
138 pub fn dimension(&self) -> usize {
139 self.columns.len()
140 }
141
142 pub fn cardinality(&self) -> usize {
145 self.visibility.count_ones()
146 }
147
148 pub fn rate_limit_permits(&self) -> u64 {
150 self.cardinality() as _
151 }
152
153 pub fn capacity(&self) -> usize {
156 self.visibility.len()
157 }
158
159 pub fn selectivity(&self) -> f64 {
160 if self.visibility.is_empty() {
161 0.0
162 } else if self.visibility.all() {
163 1.0
164 } else {
165 self.visibility.count_ones() as f64 / self.visibility.len() as f64
166 }
167 }
168
169 pub fn with_visibility(&self, visibility: impl Into<Bitmap>) -> Self {
170 DataChunk {
171 columns: self.columns.clone(),
172 visibility: visibility.into(),
173 }
174 }
175
176 pub fn visibility(&self) -> &Bitmap {
177 &self.visibility
178 }
179
180 pub fn set_visibility(&mut self, visibility: Bitmap) {
181 assert_eq!(visibility.len(), self.capacity());
182 self.visibility = visibility;
183 }
184
185 pub fn is_vis_compacted(&self) -> bool {
188 self.visibility.all()
189 }
190
191 pub fn column_at(&self, idx: usize) -> &ArrayRef {
192 &self.columns[idx]
193 }
194
195 pub fn columns(&self) -> &[ArrayRef] {
196 &self.columns
197 }
198
199 pub fn data_types(&self) -> Vec<DataType> {
201 self.columns.iter().map(|col| col.data_type()).collect()
202 }
203
204 pub fn split_column_at(&self, idx: usize) -> (Self, Self) {
210 let (left, right) = self.columns.split_at(idx);
211 let left = DataChunk::new(left.to_vec(), self.visibility.clone());
212 let right = DataChunk::new(right.to_vec(), self.visibility.clone());
213 (left, right)
214 }
215
216 pub fn to_protobuf(&self) -> PbDataChunk {
217 assert!(self.visibility.all(), "must be compacted before transfer");
218 let mut proto = PbDataChunk {
219 cardinality: self.cardinality() as u32,
220 columns: Default::default(),
221 };
222 let column_ref = &mut proto.columns;
223 for array in &*self.columns {
224 column_ref.push(array.to_protobuf());
225 }
226 proto
227 }
228
229 pub fn compact_vis(self) -> Self {
242 if self.visibility.all() {
243 return self;
244 }
245 let cardinality = self.visibility.count_ones();
246 let columns = self
247 .columns
248 .iter()
249 .map(|col| {
250 let array = col;
251 array.compact_vis(&self.visibility, cardinality).into()
252 })
253 .collect::<Vec<_>>();
254 Self::new(columns, Bitmap::ones(cardinality))
255 }
256
257 pub fn expand_vis(self, vis: Bitmap) -> Self {
259 let mut uncompact_builders: Vec<_> = self
260 .columns
261 .iter()
262 .map(|c| c.create_builder(vis.len()))
263 .collect();
264 let mut last_u = None;
265
266 for (idx, u) in vis.iter_ones().enumerate() {
267 let zeros = if let Some(last_u) = last_u {
269 u - last_u - 1
270 } else {
271 u
272 };
273 for _ in 0..zeros {
274 uncompact_builders
275 .iter_mut()
276 .for_each(|builder| builder.append_null());
277 }
278 uncompact_builders
279 .iter_mut()
280 .zip_eq_fast(self.columns.iter())
281 .for_each(|(builder, c)| builder.append(c.datum_at(idx)));
282 last_u = Some(u);
283 }
284 let zeros = if let Some(last_u) = last_u {
285 vis.len() - last_u - 1
286 } else {
287 vis.len()
288 };
289 for _ in 0..zeros {
290 uncompact_builders
291 .iter_mut()
292 .for_each(|builder| builder.append_null());
293 }
294 let array: Vec<_> = uncompact_builders
295 .into_iter()
296 .map(|builder| Arc::new(builder.finish()))
297 .collect();
298
299 Self::new(array, vis)
300 }
301
302 pub fn from_protobuf(proto: &PbDataChunk) -> ArrayResult<Self> {
303 let mut columns = vec![];
304 for any_col in proto.get_columns() {
305 let cardinality = proto.get_cardinality() as usize;
306 columns.push(ArrayImpl::from_protobuf(any_col, cardinality)?.into());
307 }
308
309 let chunk = DataChunk::new(columns, proto.cardinality as usize);
310 Ok(chunk)
311 }
312
313 pub fn rechunk(chunks: &[DataChunk], each_size_limit: usize) -> ArrayResult<Vec<DataChunk>> {
317 let Some(data_types) = chunks.first().map(|c| c.data_types()) else {
318 return Ok(Vec::new());
319 };
320
321 let mut builder = DataChunkBuilder::new(data_types, each_size_limit);
322 let mut outputs = Vec::new();
323
324 for chunk in chunks {
325 for output in builder.append_chunk(chunk.clone()) {
326 outputs.push(output);
327 }
328 }
329 if let Some(output) = builder.consume_all() {
330 outputs.push(output);
331 }
332
333 Ok(outputs)
334 }
335
336 pub fn get_hash_values<H: BuildHasher>(
339 &self,
340 column_idxes: &[usize],
341 hasher_builder: H,
342 ) -> Vec<HashCode<H>> {
343 let len = self.capacity();
344 let mut states = Vec::with_capacity(len);
345 states.resize_with(len, || hasher_builder.build_hasher());
346 for column_idx in column_idxes {
348 let array = self.column_at(*column_idx);
349 array.hash_vec(&mut states[..], self.visibility());
350 }
351 finalize_hashers(&states[..])
352 .into_iter()
353 .map(|hash_code| hash_code.into())
354 .collect_vec()
355 }
356
357 pub fn row_at(&self, pos: usize) -> (RowRef<'_>, bool) {
363 let row = self.row_at_unchecked_vis(pos);
364 let vis = self.visibility.is_set(pos);
365 (row, vis)
366 }
367
368 pub fn row_at_unchecked_vis(&self, pos: usize) -> RowRef<'_> {
373 RowRef::new(self, pos)
374 }
375
376 pub fn to_pretty(&self) -> impl Display + use<> {
378 use comfy_table::Table;
379
380 if self.cardinality() == 0 {
381 return Either::Left("(empty)");
382 }
383
384 let mut table = Table::new();
385 table.load_preset(Self::PRETTY_TABLE_PRESET);
386
387 for row in self.rows() {
388 let cells: Vec<_> = row
389 .iter()
390 .map(|v| {
391 match v {
392 None => "".to_owned(), Some(scalar) => scalar.to_text(),
394 }
395 })
396 .collect();
397 table.add_row(cells);
398 }
399
400 Either::Right(table)
401 }
402
403 pub fn keep_columns(&self, column_indices: &[usize]) -> Self {
413 let capacity: usize = self.capacity();
414 let columns = (self.columns.iter().enumerate())
415 .map(|(i, column)| {
416 if column_indices.contains(&i) {
417 column.clone()
418 } else {
419 let mut builder = column.create_builder(capacity);
420 builder.append_n(capacity, None as DatumRef<'_>);
421 builder.finish().into()
422 }
423 })
424 .collect();
425 DataChunk {
426 columns,
427 visibility: self.visibility.clone(),
428 }
429 }
430
431 pub fn project(&self, indices: &[usize]) -> Self {
437 Self {
438 columns: indices.iter().map(|i| self.columns[*i].clone()).collect(),
439 visibility: self.visibility.clone(),
440 }
441 }
442
443 pub fn project_with_vis(&self, indices: &[usize], visibility: Bitmap) -> Self {
445 assert_eq!(visibility.len(), self.capacity());
446 Self {
447 columns: indices.iter().map(|i| self.columns[*i].clone()).collect(),
448 visibility,
449 }
450 }
451
452 pub fn reorder_rows(&self, indexes: &[usize]) -> Self {
454 let mut array_builders: Vec<ArrayBuilderImpl> = self
455 .columns
456 .iter()
457 .map(|col| col.create_builder(indexes.len()))
458 .collect();
459 for &i in indexes {
460 for (builder, col) in array_builders.iter_mut().zip_eq_fast(self.columns.iter()) {
461 builder.append(col.value_at(i));
462 }
463 }
464 let columns = array_builders
465 .into_iter()
466 .map(|builder| builder.finish().into())
467 .collect();
468 DataChunk::new(columns, indexes.len())
469 }
470
471 fn partition_sizes(&self) -> (usize, Vec<&ArrayRef>) {
480 let mut col_variable: Vec<&ArrayRef> = vec![];
481 let mut row_len_fixed: usize = 0;
482 for c in &*self.columns {
483 if let Some(field_len) = try_get_exact_serialize_datum_size(c) {
484 row_len_fixed += field_len;
485 } else {
486 col_variable.push(c);
487 }
488 }
489 (row_len_fixed, col_variable)
490 }
491
492 unsafe fn compute_size_of_variable_cols_in_row(
493 variable_cols: &[&ArrayRef],
494 row_idx: usize,
495 ) -> usize {
496 unsafe {
497 variable_cols
498 .iter()
499 .map(|col| estimate_serialize_datum_size(col.value_at_unchecked(row_idx)))
500 .sum::<usize>()
501 }
502 }
503
504 unsafe fn init_buffer(
505 row_len_fixed: usize,
506 variable_cols: &[&ArrayRef],
507 row_idx: usize,
508 ) -> Vec<u8> {
509 unsafe {
510 Vec::with_capacity(
511 row_len_fixed + Self::compute_size_of_variable_cols_in_row(variable_cols, row_idx),
512 )
513 }
514 }
515
516 pub fn serialize(&self) -> Vec<Bytes> {
523 let buffers = if !self.visibility.all() {
524 let rows_num = self.visibility.len();
525 let mut buffers: Vec<Vec<u8>> = vec![];
526 let (row_len_fixed, col_variable) = self.partition_sizes();
527
528 for i in 0..rows_num {
530 unsafe {
532 if self.visibility.is_set_unchecked(i) {
533 buffers.push(Self::init_buffer(row_len_fixed, &col_variable, i));
534 } else {
535 buffers.push(vec![]);
536 }
537 }
538 }
539
540 for c in &*self.columns {
542 assert_eq!(c.len(), rows_num);
543 for (i, buffer) in buffers.iter_mut().enumerate() {
544 unsafe {
546 if self.visibility.is_set_unchecked(i) {
547 serialize_datum_into(c.value_at_unchecked(i), buffer);
548 }
549 }
550 }
551 }
552 buffers
553 } else {
554 let mut buffers: Vec<Vec<u8>> = vec![];
555 let (row_len_fixed, col_variable) = self.partition_sizes();
556 for i in 0..self.visibility.len() {
557 unsafe {
558 buffers.push(Self::init_buffer(row_len_fixed, &col_variable, i));
559 }
560 }
561 for c in &*self.columns {
562 assert_eq!(c.len(), self.visibility.len());
563 for (i, buffer) in buffers.iter_mut().enumerate() {
564 unsafe {
566 serialize_datum_into(c.value_at_unchecked(i), buffer);
567 }
568 }
569 }
570 buffers
571 };
572
573 buffers.into_iter().map(|item| item.into()).collect_vec()
574 }
575
576 pub fn serialize_with(&self, serializer: &impl ValueRowSerializer) -> Vec<Bytes> {
581 let mut results = Vec::with_capacity(self.capacity());
582 for row in self.rows_with_holes() {
583 results.push(if let Some(row) = row {
584 serializer.serialize(row).into()
585 } else {
586 Bytes::new()
587 });
588 }
589 results
590 }
591
592 pub fn estimate_value_encoding_size(&self, column_indices: &[usize]) -> usize {
595 if self.capacity() == 0 {
596 0
597 } else {
598 column_indices
599 .iter()
600 .map(|idx| {
601 let datum = self.column_at(*idx).datum_at(0);
602 estimate_serialize_datum_size(datum)
603 })
604 .sum()
605 }
606 }
607}
608
609impl fmt::Debug for DataChunk {
610 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
611 write!(
612 f,
613 "DataChunk {{ cardinality = {}, capacity = {}, data = \n{} }}",
614 self.cardinality(),
615 self.capacity(),
616 self.to_pretty()
617 )
618 }
619}
620
621impl<'a> From<&'a StructArray> for DataChunk {
622 fn from(array: &'a StructArray) -> Self {
623 Self {
624 columns: array.fields().cloned().collect(),
625 visibility: Bitmap::ones(array.len()),
626 }
627 }
628}
629
630impl EstimateSize for DataChunk {
631 fn estimated_heap_size(&self) -> usize {
632 self.columns
633 .iter()
634 .map(|a| a.estimated_heap_size())
635 .sum::<usize>()
636 + self.visibility.estimated_heap_size()
637 }
638}
639
640pub trait DataChunkTestExt {
642 const SEED: u64 = 0xFF67FEABBAEF76FF;
644
645 fn from_pretty(s: &str) -> Self;
677
678 fn with_invisible_holes(self) -> Self
680 where
681 Self: Sized;
682
683 fn assert_valid(&self);
685
686 fn gen_data_chunk(
688 chunk_offset: usize,
689 chunk_size: usize,
690 data_types: &[DataType],
691 string_properties: &VarcharProperty,
692 visibility_ratio: f64,
693 ) -> Self;
694
695 fn gen_data_chunks(
697 num_of_chunks: usize,
698 chunk_size: usize,
699 data_types: &[DataType],
700 string_properties: &VarcharProperty,
701 visibility_ratio: f64,
702 ) -> Vec<Self>
703 where
704 Self: Sized;
705}
706
707impl DataChunkTestExt for DataChunk {
708 fn from_pretty(s: &str) -> Self {
709 use crate::types::ScalarImpl;
710 fn parse_type(s: &str) -> DataType {
711 if let Some(s) = s.strip_suffix("[]") {
712 return DataType::list(parse_type(s));
713 }
714
715 if let Some(inner) = s.strip_prefix("map<").and_then(|s| s.strip_suffix('>')) {
718 let mut parts = inner.split(',');
719 let key_type = parts.next().expect("Key type expected");
720 let value_type = parts.next().expect("Value type expected");
721 return DataType::Map(MapType::from_kv(
722 parse_type(key_type),
723 parse_type(value_type),
724 ));
725 }
726
727 match s {
728 "B" => DataType::Boolean,
729 "I" => DataType::Int64,
730 "i" => DataType::Int32,
731 "F" => DataType::Float64,
732 "f" => DataType::Float32,
733 "TS" => DataType::Timestamp,
734 "TZ" => DataType::Timestamptz,
735 "T" => DataType::Varchar,
736 "SRL" => DataType::Serial,
737 "D" => DataType::Date,
738 array if array.starts_with('<') && array.ends_with('>') => {
739 DataType::Struct(StructType::unnamed(
740 array[1..array.len() - 1]
741 .split(',')
742 .map(parse_type)
743 .collect(),
744 ))
745 }
746 _ => todo!("unsupported type: {s:?}"),
747 }
748 }
749
750 let mut lines = s.split('\n').filter(|l| !l.trim().is_empty());
751 let header = lines.next().unwrap().trim();
753 let datatypes = header
754 .split_ascii_whitespace()
755 .take_while(|c| *c != "//")
756 .map(parse_type)
757 .collect::<Vec<_>>();
758 let mut array_builders = datatypes
759 .iter()
760 .map(|ty| ty.create_array_builder(1))
761 .collect::<Vec<_>>();
762 let mut visibility = vec![];
763 for line in lines {
764 let mut token = line.trim().split_ascii_whitespace();
765 #[allow(clippy::disallowed_methods)]
767 for ((builder, ty), val_str) in
768 array_builders.iter_mut().zip(&datatypes).zip(&mut token)
769 {
770 let datum = match val_str {
771 "." => None,
772 "(empty)" => Some("".into()),
773 _ => Some(ScalarImpl::from_text_for_test(val_str, ty).unwrap()),
775 };
776 builder.append(datum);
777 }
778 let visible = match token.next() {
779 None | Some("//") => true,
780 Some("D") => false,
781 Some(t) => panic!("invalid token: {t:?}"),
782 };
783 visibility.push(visible);
784 }
785 let columns = array_builders
786 .into_iter()
787 .map(|builder| builder.finish().into())
788 .collect();
789 let vis = Bitmap::from_iter(visibility);
790 let chunk = DataChunk::new(columns, vis);
791 chunk.assert_valid();
792 chunk
793 }
794
795 fn with_invisible_holes(self) -> Self
796 where
797 Self: Sized,
798 {
799 let (cols, vis) = self.into_parts();
800 let n = vis.len();
801 let mut new_vis = BitmapBuilder::with_capacity(n * 2);
802 for b in vis.iter() {
803 new_vis.append(b);
804 new_vis.append(false);
805 }
806 let new_cols = cols
807 .into_iter()
808 .map(|col| {
809 let arr = col;
810 let mut builder = arr.create_builder(n * 2);
811 for v in arr.iter() {
812 builder.append(v.to_owned_datum());
813 builder.append_null();
814 }
815
816 builder.finish().into()
817 })
818 .collect();
819 let chunk = DataChunk::new(new_cols, new_vis.finish());
820 chunk.assert_valid();
821 chunk
822 }
823
824 fn assert_valid(&self) {
825 let cols = self.columns();
826 let vis = &self.visibility;
827 let n = vis.len();
828 for col in cols {
829 assert_eq!(col.len(), n);
830 }
831 }
832
833 fn gen_data_chunk(
834 chunk_offset: usize,
835 chunk_size: usize,
836 data_types: &[DataType],
837 varchar_properties: &VarcharProperty,
838 visibility_percent: f64,
839 ) -> Self {
840 let vis = if visibility_percent == 0.0 {
841 Bitmap::zeros(chunk_size)
842 } else if visibility_percent == 1.0 {
843 Bitmap::ones(chunk_size)
844 } else {
845 let mut rng = SmallRng::from_seed([0; 32]);
846 let mut vis_builder = BitmapBuilder::with_capacity(chunk_size);
847 for _i in 0..chunk_size {
848 vis_builder.append(rng.random_bool(visibility_percent));
849 }
850 vis_builder.finish()
851 };
852
853 let mut columns = Vec::new();
854 for data_type in data_types {
856 let mut array_builder = data_type.create_array_builder(chunk_size);
857 for j in 0..chunk_size {
858 let offset = ((chunk_offset + 1) * (j + 1)) as u64;
859 match data_type {
860 DataType::Varchar => {
861 let datum =
862 FieldGeneratorImpl::with_varchar(varchar_properties, Self::SEED)
863 .generate_datum(offset);
864 array_builder.append(&datum);
865 }
866 DataType::Timestamp => {
867 let datum =
868 FieldGeneratorImpl::with_timestamp(None, None, None, Self::SEED)
869 .expect("create timestamp generator should succeed")
870 .generate_datum(offset);
871 array_builder.append(datum);
872 }
873 DataType::Timestamptz => {
874 let datum =
875 FieldGeneratorImpl::with_timestamptz(None, None, None, Self::SEED)
876 .expect("create timestamptz generator should succeed")
877 .generate_datum(offset);
878 array_builder.append(datum);
879 }
880 _ if data_type.is_numeric() => {
881 let mut data_gen = FieldGeneratorImpl::with_number_random(
882 data_type.clone(),
883 None,
884 None,
885 Self::SEED,
886 )
887 .unwrap();
888 let datum = data_gen.generate_datum(offset);
889 array_builder.append(datum);
890 }
891 _ => todo!("unsupported type: {data_type:?}"),
892 }
893 }
894 columns.push(array_builder.finish().into());
895 }
896 DataChunk::new(columns, vis)
897 }
898
899 fn gen_data_chunks(
900 num_of_chunks: usize,
901 chunk_size: usize,
902 data_types: &[DataType],
903 varchar_properties: &VarcharProperty,
904 visibility_percent: f64,
905 ) -> Vec<Self> {
906 (0..num_of_chunks)
907 .map(|i| {
908 Self::gen_data_chunk(
909 i,
910 chunk_size,
911 data_types,
912 varchar_properties,
913 visibility_percent,
914 )
915 })
916 .collect()
917 }
918}
919
920#[cfg(test)]
921mod tests {
922 use crate::array::*;
923 use crate::row::Row;
924
925 #[test]
926 fn test_rechunk() {
927 let test_case = |num_chunks: usize, chunk_size: usize, new_chunk_size: usize| {
928 let mut chunks = vec![];
929 for chunk_idx in 0..num_chunks {
930 let mut builder = PrimitiveArrayBuilder::<i32>::new(0);
931 for i in chunk_size * chunk_idx..chunk_size * (chunk_idx + 1) {
932 builder.append(Some(i as i32));
933 }
934 let chunk = DataChunk::new(vec![Arc::new(builder.finish().into())], chunk_size);
935 chunks.push(chunk);
936 }
937
938 let total_size = num_chunks * chunk_size;
939 let num_full_new_chunk = total_size / new_chunk_size;
940 let mut chunk_sizes = vec![new_chunk_size; num_full_new_chunk];
941 let remainder = total_size % new_chunk_size;
942 if remainder != 0 {
943 chunk_sizes.push(remainder);
944 }
945
946 let new_chunks = DataChunk::rechunk(&chunks, new_chunk_size).unwrap();
947 assert_eq!(new_chunks.len(), chunk_sizes.len());
948 for (idx, chunk_size) in chunk_sizes.iter().enumerate() {
950 assert_eq!(*chunk_size, new_chunks[idx].capacity());
951 }
952
953 let mut chunk_idx = 0;
954 let mut cur_idx = 0;
955 for val in 0..total_size {
956 if cur_idx >= chunk_sizes[chunk_idx] {
957 cur_idx = 0;
958 chunk_idx += 1;
959 }
960 assert_eq!(
961 new_chunks[chunk_idx]
962 .column_at(0)
963 .as_int32()
964 .value_at(cur_idx)
965 .unwrap(),
966 val as i32
967 );
968 cur_idx += 1;
969 }
970 };
971
972 test_case(0, 0, 1);
973 test_case(0, 10, 1);
974 test_case(10, 0, 1);
975 test_case(1, 1, 6);
976 test_case(1, 10, 11);
977 test_case(2, 3, 6);
978 test_case(5, 5, 6);
979 test_case(10, 10, 7);
980 }
981
982 #[test]
983 fn test_chunk_iter() {
984 let num_of_columns: usize = 2;
985 let length = 5;
986 let mut columns = vec![];
987 for i in 0..num_of_columns {
988 let mut builder = PrimitiveArrayBuilder::<i32>::new(length);
989 for _ in 0..length {
990 builder.append(Some(i as i32));
991 }
992 let arr = builder.finish();
993 columns.push(Arc::new(arr.into()))
994 }
995 let chunk: DataChunk = DataChunk::new(columns, length);
996 for row in chunk.rows() {
997 for i in 0..num_of_columns {
998 let val = row.datum_at(i).unwrap();
999 assert_eq!(val.into_int32(), i as i32);
1000 }
1001 }
1002 }
1003
1004 #[test]
1005 fn test_to_pretty_string() {
1006 let chunk = DataChunk::new(
1007 vec![
1008 Arc::new(I64Array::from_iter([1, 2, 3, 4]).into()),
1009 Arc::new(I64Array::from_iter([Some(6), None, Some(7), None]).into()),
1010 ],
1011 4,
1012 );
1013 assert_eq!(
1014 chunk.to_pretty().to_string(),
1015 "\
1016+---+---+
1017| 1 | 6 |
1018| 2 | |
1019| 3 | 7 |
1020| 4 | |
1021+---+---+"
1022 );
1023 }
1024
1025 #[test]
1026 fn test_no_column_chunk() {
1027 let chunk = DataChunk::new_dummy(10);
1028 assert_eq!(chunk.rows().count(), 10);
1029
1030 let chunk_after_serde = DataChunk::from_protobuf(&chunk.to_protobuf()).unwrap();
1031 assert_eq!(chunk_after_serde.rows().count(), 10);
1032 assert_eq!(chunk_after_serde.cardinality(), 10);
1033 }
1034
1035 #[test]
1036 fn reorder_columns() {
1037 let chunk = DataChunk::from_pretty(
1038 "I I I
1039 2 5 1
1040 4 9 2
1041 6 9 3",
1042 );
1043 assert_eq!(
1044 chunk.project(&[2, 1, 0]),
1045 DataChunk::from_pretty(
1046 "I I I
1047 1 5 2
1048 2 9 4
1049 3 9 6",
1050 )
1051 );
1052 assert_eq!(
1053 chunk.project(&[2, 0]),
1054 DataChunk::from_pretty(
1055 "I I
1056 1 2
1057 2 4
1058 3 6",
1059 )
1060 );
1061 assert_eq!(chunk.project(&[0, 1, 2]), chunk);
1062 assert_eq!(chunk.project(&[]).cardinality(), 3);
1063 }
1064
1065 #[test]
1066 fn test_chunk_estimated_size() {
1067 assert_eq!(
1068 72,
1069 DataChunk::from_pretty(
1070 "I I I
1071 1 5 2
1072 2 9 4
1073 3 9 6",
1074 )
1075 .estimated_heap_size()
1076 );
1077 assert_eq!(
1078 48,
1079 DataChunk::from_pretty(
1080 "I I
1081 1 2
1082 2 4
1083 3 6",
1084 )
1085 .estimated_heap_size()
1086 );
1087 }
1088}