1use std::borrow::Cow;
16use std::fmt;
17use std::fmt::Display;
18use std::hash::BuildHasher;
19use std::sync::Arc;
20
21use bytes::Bytes;
22use either::Either;
23use itertools::Itertools;
24use rand::rngs::SmallRng;
25use rand::{Rng, SeedableRng};
26use risingwave_common_estimate_size::EstimateSize;
27use risingwave_pb::data::PbDataChunk;
28
29use super::{Array, ArrayImpl, ArrayRef, ArrayResult, StructArray};
30use crate::array::ArrayBuilderImpl;
31use crate::array::data_chunk_iter::RowRef;
32use crate::bitmap::{Bitmap, BitmapBuilder};
33use crate::field_generator::{FieldGeneratorImpl, VarcharProperty};
34use crate::hash::HashCode;
35use crate::row::Row;
36use crate::types::{DataType, DatumRef, StructType, ToOwnedDatum, ToText};
37use crate::util::chunk_coalesce::DataChunkBuilder;
38use crate::util::hash_util::finalize_hashers;
39use crate::util::iter_util::ZipEqFast;
40use crate::util::value_encoding::{
41 ValueRowSerializer, estimate_serialize_datum_size, serialize_datum_into,
42 try_get_exact_serialize_datum_size,
43};
44
45#[derive(Clone, PartialEq)]
65#[must_use]
66pub struct DataChunk {
67 columns: Arc<[ArrayRef]>,
68 visibility: Bitmap,
69}
70
71impl DataChunk {
72 pub(crate) const PRETTY_TABLE_PRESET: &'static str = "||--+-++| ++++++";
73
74 pub fn new(columns: Vec<ArrayRef>, visibility: impl Into<Bitmap>) -> Self {
78 let visibility = visibility.into();
79 let capacity = visibility.len();
80 for column in &columns {
81 assert_eq!(capacity, column.len());
82 }
83
84 DataChunk {
85 columns: columns.into(),
86 visibility,
87 }
88 }
89
90 pub fn new_dummy(cardinality: usize) -> Self {
92 DataChunk {
93 columns: Arc::new([]),
94 visibility: Bitmap::ones(cardinality),
95 }
96 }
97
98 pub fn from_rows(rows: &[impl Row], data_types: &[DataType]) -> Self {
105 let mut builder = DataChunkBuilder::new(data_types.to_vec(), rows.len() + 1);
110
111 for row in rows {
112 let none = builder.append_one_row(row);
113 debug_assert!(none.is_none());
114 }
115
116 builder.consume_all().expect("chunk should not be empty")
117 }
118
119 pub fn next_visible_row_idx(&self, row_idx: usize) -> Option<usize> {
121 self.visibility.next_set_bit(row_idx)
122 }
123
124 pub fn into_parts(self) -> (Vec<ArrayRef>, Bitmap) {
125 (self.columns.to_vec(), self.visibility)
126 }
127
128 pub fn into_parts_v2(self) -> (Arc<[ArrayRef]>, Bitmap) {
129 (self.columns, self.visibility)
130 }
131
132 pub fn from_parts(columns: Arc<[ArrayRef]>, visibilities: Bitmap) -> Self {
133 Self {
134 columns,
135 visibility: visibilities,
136 }
137 }
138
139 pub fn dimension(&self) -> usize {
140 self.columns.len()
141 }
142
143 pub fn cardinality(&self) -> usize {
146 self.visibility.count_ones()
147 }
148
149 pub fn capacity(&self) -> usize {
152 self.visibility.len()
153 }
154
155 pub fn selectivity(&self) -> f64 {
156 if self.visibility.is_empty() {
157 0.0
158 } else if self.visibility.all() {
159 1.0
160 } else {
161 self.visibility.count_ones() as f64 / self.visibility.len() as f64
162 }
163 }
164
165 pub fn with_visibility(&self, visibility: impl Into<Bitmap>) -> Self {
166 DataChunk {
167 columns: self.columns.clone(),
168 visibility: visibility.into(),
169 }
170 }
171
172 pub fn visibility(&self) -> &Bitmap {
173 &self.visibility
174 }
175
176 pub fn set_visibility(&mut self, visibility: Bitmap) {
177 assert_eq!(visibility.len(), self.capacity());
178 self.visibility = visibility;
179 }
180
181 pub fn is_compacted(&self) -> bool {
182 self.visibility.all()
183 }
184
185 pub fn column_at(&self, idx: usize) -> &ArrayRef {
186 &self.columns[idx]
187 }
188
189 pub fn columns(&self) -> &[ArrayRef] {
190 &self.columns
191 }
192
193 pub fn data_types(&self) -> Vec<DataType> {
195 self.columns.iter().map(|col| col.data_type()).collect()
196 }
197
198 pub fn split_column_at(&self, idx: usize) -> (Self, Self) {
204 let (left, right) = self.columns.split_at(idx);
205 let left = DataChunk::new(left.to_vec(), self.visibility.clone());
206 let right = DataChunk::new(right.to_vec(), self.visibility.clone());
207 (left, right)
208 }
209
210 pub fn to_protobuf(&self) -> PbDataChunk {
211 assert!(self.visibility.all(), "must be compacted before transfer");
212 let mut proto = PbDataChunk {
213 cardinality: self.cardinality() as u32,
214 columns: Default::default(),
215 };
216 let column_ref = &mut proto.columns;
217 for array in &*self.columns {
218 column_ref.push(array.to_protobuf());
219 }
220 proto
221 }
222
223 pub fn compact(self) -> Self {
237 if self.visibility.all() {
238 return self;
239 }
240 let cardinality = self.visibility.count_ones();
241 let columns = self
242 .columns
243 .iter()
244 .map(|col| {
245 let array = col;
246 array.compact(&self.visibility, cardinality).into()
247 })
248 .collect::<Vec<_>>();
249 Self::new(columns, Bitmap::ones(cardinality))
250 }
251
252 pub fn uncompact(self, vis: Bitmap) -> Self {
254 let mut uncompact_builders: Vec<_> = self
255 .columns
256 .iter()
257 .map(|c| c.create_builder(vis.len()))
258 .collect();
259 let mut last_u = None;
260
261 for (idx, u) in vis.iter_ones().enumerate() {
262 let zeros = if let Some(last_u) = last_u {
264 u - last_u - 1
265 } else {
266 u
267 };
268 for _ in 0..zeros {
269 uncompact_builders
270 .iter_mut()
271 .for_each(|builder| builder.append_null());
272 }
273 uncompact_builders
274 .iter_mut()
275 .zip_eq_fast(self.columns.iter())
276 .for_each(|(builder, c)| builder.append(c.datum_at(idx)));
277 last_u = Some(u);
278 }
279 let zeros = if let Some(last_u) = last_u {
280 vis.len() - last_u - 1
281 } else {
282 vis.len()
283 };
284 for _ in 0..zeros {
285 uncompact_builders
286 .iter_mut()
287 .for_each(|builder| builder.append_null());
288 }
289 let array: Vec<_> = uncompact_builders
290 .into_iter()
291 .map(|builder| Arc::new(builder.finish()))
292 .collect();
293
294 Self::new(array, vis)
295 }
296
297 pub fn compact_cow(&self) -> Cow<'_, Self> {
301 if self.visibility.all() {
302 return Cow::Borrowed(self);
303 }
304 let cardinality = self.visibility.count_ones();
305 let columns = self
306 .columns
307 .iter()
308 .map(|col| {
309 let array = col;
310 array.compact(&self.visibility, cardinality).into()
311 })
312 .collect::<Vec<_>>();
313 Cow::Owned(Self::new(columns, Bitmap::ones(cardinality)))
314 }
315
316 pub fn from_protobuf(proto: &PbDataChunk) -> ArrayResult<Self> {
317 let mut columns = vec![];
318 for any_col in proto.get_columns() {
319 let cardinality = proto.get_cardinality() as usize;
320 columns.push(ArrayImpl::from_protobuf(any_col, cardinality)?.into());
321 }
322
323 let chunk = DataChunk::new(columns, proto.cardinality as usize);
324 Ok(chunk)
325 }
326
327 pub fn rechunk(chunks: &[DataChunk], each_size_limit: usize) -> ArrayResult<Vec<DataChunk>> {
331 let Some(data_types) = chunks.first().map(|c| c.data_types()) else {
332 return Ok(Vec::new());
333 };
334
335 let mut builder = DataChunkBuilder::new(data_types, each_size_limit);
336 let mut outputs = Vec::new();
337
338 for chunk in chunks {
339 for output in builder.append_chunk(chunk.clone()) {
340 outputs.push(output);
341 }
342 }
343 if let Some(output) = builder.consume_all() {
344 outputs.push(output);
345 }
346
347 Ok(outputs)
348 }
349
350 pub fn get_hash_values<H: BuildHasher>(
353 &self,
354 column_idxes: &[usize],
355 hasher_builder: H,
356 ) -> Vec<HashCode<H>> {
357 let len = self.capacity();
358 let mut states = Vec::with_capacity(len);
359 states.resize_with(len, || hasher_builder.build_hasher());
360 for column_idx in column_idxes {
362 let array = self.column_at(*column_idx);
363 array.hash_vec(&mut states[..], self.visibility());
364 }
365 finalize_hashers(&states[..])
366 .into_iter()
367 .map(|hash_code| hash_code.into())
368 .collect_vec()
369 }
370
371 pub fn row_at(&self, pos: usize) -> (RowRef<'_>, bool) {
377 let row = self.row_at_unchecked_vis(pos);
378 let vis = self.visibility.is_set(pos);
379 (row, vis)
380 }
381
382 pub fn row_at_unchecked_vis(&self, pos: usize) -> RowRef<'_> {
387 RowRef::new(self, pos)
388 }
389
390 pub fn to_pretty(&self) -> impl Display + use<> {
392 use comfy_table::Table;
393
394 if self.cardinality() == 0 {
395 return Either::Left("(empty)");
396 }
397
398 let mut table = Table::new();
399 table.load_preset(Self::PRETTY_TABLE_PRESET);
400
401 for row in self.rows() {
402 let cells: Vec<_> = row
403 .iter()
404 .map(|v| {
405 match v {
406 None => "".to_owned(), Some(scalar) => scalar.to_text(),
408 }
409 })
410 .collect();
411 table.add_row(cells);
412 }
413
414 Either::Right(table)
415 }
416
417 pub fn keep_columns(&self, column_indices: &[usize]) -> Self {
427 let capacity: usize = self.capacity();
428 let columns = (self.columns.iter().enumerate())
429 .map(|(i, column)| {
430 if column_indices.contains(&i) {
431 column.clone()
432 } else {
433 let mut builder = column.create_builder(capacity);
434 builder.append_n(capacity, None as DatumRef<'_>);
435 builder.finish().into()
436 }
437 })
438 .collect();
439 DataChunk {
440 columns,
441 visibility: self.visibility.clone(),
442 }
443 }
444
445 pub fn project(&self, indices: &[usize]) -> Self {
451 Self {
452 columns: indices.iter().map(|i| self.columns[*i].clone()).collect(),
453 visibility: self.visibility.clone(),
454 }
455 }
456
457 pub fn project_with_vis(&self, indices: &[usize], visibility: Bitmap) -> Self {
459 assert_eq!(visibility.len(), self.capacity());
460 Self {
461 columns: indices.iter().map(|i| self.columns[*i].clone()).collect(),
462 visibility,
463 }
464 }
465
466 pub fn reorder_rows(&self, indexes: &[usize]) -> Self {
468 let mut array_builders: Vec<ArrayBuilderImpl> = self
469 .columns
470 .iter()
471 .map(|col| col.create_builder(indexes.len()))
472 .collect();
473 for &i in indexes {
474 for (builder, col) in array_builders.iter_mut().zip_eq_fast(self.columns.iter()) {
475 builder.append(col.value_at(i));
476 }
477 }
478 let columns = array_builders
479 .into_iter()
480 .map(|builder| builder.finish().into())
481 .collect();
482 DataChunk::new(columns, indexes.len())
483 }
484
485 fn partition_sizes(&self) -> (usize, Vec<&ArrayRef>) {
494 let mut col_variable: Vec<&ArrayRef> = vec![];
495 let mut row_len_fixed: usize = 0;
496 for c in &*self.columns {
497 if let Some(field_len) = try_get_exact_serialize_datum_size(c) {
498 row_len_fixed += field_len;
499 } else {
500 col_variable.push(c);
501 }
502 }
503 (row_len_fixed, col_variable)
504 }
505
506 unsafe fn compute_size_of_variable_cols_in_row(
507 variable_cols: &[&ArrayRef],
508 row_idx: usize,
509 ) -> usize {
510 unsafe {
511 variable_cols
512 .iter()
513 .map(|col| estimate_serialize_datum_size(col.value_at_unchecked(row_idx)))
514 .sum::<usize>()
515 }
516 }
517
518 unsafe fn init_buffer(
519 row_len_fixed: usize,
520 variable_cols: &[&ArrayRef],
521 row_idx: usize,
522 ) -> Vec<u8> {
523 unsafe {
524 Vec::with_capacity(
525 row_len_fixed + Self::compute_size_of_variable_cols_in_row(variable_cols, row_idx),
526 )
527 }
528 }
529
530 pub fn serialize(&self) -> Vec<Bytes> {
537 let buffers = if !self.visibility.all() {
538 let rows_num = self.visibility.len();
539 let mut buffers: Vec<Vec<u8>> = vec![];
540 let (row_len_fixed, col_variable) = self.partition_sizes();
541
542 for i in 0..rows_num {
544 unsafe {
546 if self.visibility.is_set_unchecked(i) {
547 buffers.push(Self::init_buffer(row_len_fixed, &col_variable, i));
548 } else {
549 buffers.push(vec![]);
550 }
551 }
552 }
553
554 for c in &*self.columns {
556 assert_eq!(c.len(), rows_num);
557 for (i, buffer) in buffers.iter_mut().enumerate() {
558 unsafe {
560 if self.visibility.is_set_unchecked(i) {
561 serialize_datum_into(c.value_at_unchecked(i), buffer);
562 }
563 }
564 }
565 }
566 buffers
567 } else {
568 let mut buffers: Vec<Vec<u8>> = vec![];
569 let (row_len_fixed, col_variable) = self.partition_sizes();
570 for i in 0..self.visibility.len() {
571 unsafe {
572 buffers.push(Self::init_buffer(row_len_fixed, &col_variable, i));
573 }
574 }
575 for c in &*self.columns {
576 assert_eq!(c.len(), self.visibility.len());
577 for (i, buffer) in buffers.iter_mut().enumerate() {
578 unsafe {
580 serialize_datum_into(c.value_at_unchecked(i), buffer);
581 }
582 }
583 }
584 buffers
585 };
586
587 buffers.into_iter().map(|item| item.into()).collect_vec()
588 }
589
590 pub fn serialize_with(&self, serializer: &impl ValueRowSerializer) -> Vec<Bytes> {
595 let mut results = Vec::with_capacity(self.capacity());
596 for row in self.rows_with_holes() {
597 results.push(if let Some(row) = row {
598 serializer.serialize(row).into()
599 } else {
600 Bytes::new()
601 });
602 }
603 results
604 }
605
606 pub fn estimate_value_encoding_size(&self, column_indices: &[usize]) -> usize {
609 if self.capacity() == 0 {
610 0
611 } else {
612 column_indices
613 .iter()
614 .map(|idx| {
615 let datum = self.column_at(*idx).datum_at(0);
616 estimate_serialize_datum_size(datum)
617 })
618 .sum()
619 }
620 }
621}
622
623impl fmt::Debug for DataChunk {
624 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
625 write!(
626 f,
627 "DataChunk {{ cardinality = {}, capacity = {}, data = \n{} }}",
628 self.cardinality(),
629 self.capacity(),
630 self.to_pretty()
631 )
632 }
633}
634
635impl<'a> From<&'a StructArray> for DataChunk {
636 fn from(array: &'a StructArray) -> Self {
637 Self {
638 columns: array.fields().cloned().collect(),
639 visibility: Bitmap::ones(array.len()),
640 }
641 }
642}
643
644impl EstimateSize for DataChunk {
645 fn estimated_heap_size(&self) -> usize {
646 self.columns
647 .iter()
648 .map(|a| a.estimated_heap_size())
649 .sum::<usize>()
650 + self.visibility.estimated_heap_size()
651 }
652}
653
654pub trait DataChunkTestExt {
656 const SEED: u64 = 0xFF67FEABBAEF76FF;
658
659 fn from_pretty(s: &str) -> Self;
691
692 fn with_invisible_holes(self) -> Self
694 where
695 Self: Sized;
696
697 fn assert_valid(&self);
699
700 fn gen_data_chunk(
702 chunk_offset: usize,
703 chunk_size: usize,
704 data_types: &[DataType],
705 string_properties: &VarcharProperty,
706 visibility_ratio: f64,
707 ) -> Self;
708
709 fn gen_data_chunks(
711 num_of_chunks: usize,
712 chunk_size: usize,
713 data_types: &[DataType],
714 string_properties: &VarcharProperty,
715 visibility_ratio: f64,
716 ) -> Vec<Self>
717 where
718 Self: Sized;
719}
720
721impl DataChunkTestExt for DataChunk {
722 fn from_pretty(s: &str) -> Self {
723 use crate::types::ScalarImpl;
724 fn parse_type(s: &str) -> DataType {
725 if let Some(s) = s.strip_suffix("[]") {
726 return DataType::List(Box::new(parse_type(s)));
727 }
728 match s {
729 "B" => DataType::Boolean,
730 "I" => DataType::Int64,
731 "i" => DataType::Int32,
732 "F" => DataType::Float64,
733 "f" => DataType::Float32,
734 "TS" => DataType::Timestamp,
735 "TZ" => DataType::Timestamptz,
736 "T" => DataType::Varchar,
737 "SRL" => DataType::Serial,
738 "D" => DataType::Date,
739 array if array.starts_with('<') && array.ends_with('>') => {
740 DataType::Struct(StructType::unnamed(
741 array[1..array.len() - 1]
742 .split(',')
743 .map(parse_type)
744 .collect(),
745 ))
746 }
747 _ => todo!("unsupported type: {s:?}"),
748 }
749 }
750
751 let mut lines = s.split('\n').filter(|l| !l.trim().is_empty());
752 let header = lines.next().unwrap().trim();
754 let datatypes = header
755 .split_ascii_whitespace()
756 .take_while(|c| *c != "//")
757 .map(parse_type)
758 .collect::<Vec<_>>();
759 let mut array_builders = datatypes
760 .iter()
761 .map(|ty| ty.create_array_builder(1))
762 .collect::<Vec<_>>();
763 let mut visibility = vec![];
764 for line in lines {
765 let mut token = line.trim().split_ascii_whitespace();
766 #[allow(clippy::disallowed_methods)]
768 for ((builder, ty), val_str) in
769 array_builders.iter_mut().zip(&datatypes).zip(&mut token)
770 {
771 let datum = match val_str {
772 "." => None,
773 "(empty)" => Some("".into()),
774 _ => Some(ScalarImpl::from_text(val_str, ty).unwrap()),
775 };
776 builder.append(datum);
777 }
778 let visible = match token.next() {
779 None | Some("//") => true,
780 Some("D") => false,
781 Some(t) => panic!("invalid token: {t:?}"),
782 };
783 visibility.push(visible);
784 }
785 let columns = array_builders
786 .into_iter()
787 .map(|builder| builder.finish().into())
788 .collect();
789 let vis = Bitmap::from_iter(visibility);
790 let chunk = DataChunk::new(columns, vis);
791 chunk.assert_valid();
792 chunk
793 }
794
795 fn with_invisible_holes(self) -> Self
796 where
797 Self: Sized,
798 {
799 let (cols, vis) = self.into_parts();
800 let n = vis.len();
801 let mut new_vis = BitmapBuilder::with_capacity(n * 2);
802 for b in vis.iter() {
803 new_vis.append(b);
804 new_vis.append(false);
805 }
806 let new_cols = cols
807 .into_iter()
808 .map(|col| {
809 let arr = col;
810 let mut builder = arr.create_builder(n * 2);
811 for v in arr.iter() {
812 builder.append(v.to_owned_datum());
813 builder.append_null();
814 }
815
816 builder.finish().into()
817 })
818 .collect();
819 let chunk = DataChunk::new(new_cols, new_vis.finish());
820 chunk.assert_valid();
821 chunk
822 }
823
824 fn assert_valid(&self) {
825 let cols = self.columns();
826 let vis = &self.visibility;
827 let n = vis.len();
828 for col in cols {
829 assert_eq!(col.len(), n);
830 }
831 }
832
833 fn gen_data_chunk(
834 chunk_offset: usize,
835 chunk_size: usize,
836 data_types: &[DataType],
837 varchar_properties: &VarcharProperty,
838 visibility_percent: f64,
839 ) -> Self {
840 let vis = if visibility_percent == 0.0 {
841 Bitmap::zeros(chunk_size)
842 } else if visibility_percent == 1.0 {
843 Bitmap::ones(chunk_size)
844 } else {
845 let mut rng = SmallRng::from_seed([0; 32]);
846 let mut vis_builder = BitmapBuilder::with_capacity(chunk_size);
847 for _i in 0..chunk_size {
848 vis_builder.append(rng.random_bool(visibility_percent));
849 }
850 vis_builder.finish()
851 };
852
853 let mut columns = Vec::new();
854 for data_type in data_types {
856 let mut array_builder = data_type.create_array_builder(chunk_size);
857 for j in 0..chunk_size {
858 let offset = ((chunk_offset + 1) * (j + 1)) as u64;
859 match data_type {
860 DataType::Varchar => {
861 let datum =
862 FieldGeneratorImpl::with_varchar(varchar_properties, Self::SEED)
863 .generate_datum(offset);
864 array_builder.append(&datum);
865 }
866 DataType::Timestamp => {
867 let datum =
868 FieldGeneratorImpl::with_timestamp(None, None, None, Self::SEED)
869 .expect("create timestamp generator should succeed")
870 .generate_datum(offset);
871 array_builder.append(datum);
872 }
873 DataType::Timestamptz => {
874 let datum =
875 FieldGeneratorImpl::with_timestamptz(None, None, None, Self::SEED)
876 .expect("create timestamptz generator should succeed")
877 .generate_datum(offset);
878 array_builder.append(datum);
879 }
880 _ if data_type.is_numeric() => {
881 let mut data_gen = FieldGeneratorImpl::with_number_random(
882 data_type.clone(),
883 None,
884 None,
885 Self::SEED,
886 )
887 .unwrap();
888 let datum = data_gen.generate_datum(offset);
889 array_builder.append(datum);
890 }
891 _ => todo!("unsupported type: {data_type:?}"),
892 }
893 }
894 columns.push(array_builder.finish().into());
895 }
896 DataChunk::new(columns, vis)
897 }
898
899 fn gen_data_chunks(
900 num_of_chunks: usize,
901 chunk_size: usize,
902 data_types: &[DataType],
903 varchar_properties: &VarcharProperty,
904 visibility_percent: f64,
905 ) -> Vec<Self> {
906 (0..num_of_chunks)
907 .map(|i| {
908 Self::gen_data_chunk(
909 i,
910 chunk_size,
911 data_types,
912 varchar_properties,
913 visibility_percent,
914 )
915 })
916 .collect()
917 }
918}
919
920#[cfg(test)]
921mod tests {
922 use crate::array::*;
923 use crate::row::Row;
924
925 #[test]
926 fn test_rechunk() {
927 let test_case = |num_chunks: usize, chunk_size: usize, new_chunk_size: usize| {
928 let mut chunks = vec![];
929 for chunk_idx in 0..num_chunks {
930 let mut builder = PrimitiveArrayBuilder::<i32>::new(0);
931 for i in chunk_size * chunk_idx..chunk_size * (chunk_idx + 1) {
932 builder.append(Some(i as i32));
933 }
934 let chunk = DataChunk::new(vec![Arc::new(builder.finish().into())], chunk_size);
935 chunks.push(chunk);
936 }
937
938 let total_size = num_chunks * chunk_size;
939 let num_full_new_chunk = total_size / new_chunk_size;
940 let mut chunk_sizes = vec![new_chunk_size; num_full_new_chunk];
941 let remainder = total_size % new_chunk_size;
942 if remainder != 0 {
943 chunk_sizes.push(remainder);
944 }
945
946 let new_chunks = DataChunk::rechunk(&chunks, new_chunk_size).unwrap();
947 assert_eq!(new_chunks.len(), chunk_sizes.len());
948 for (idx, chunk_size) in chunk_sizes.iter().enumerate() {
950 assert_eq!(*chunk_size, new_chunks[idx].capacity());
951 }
952
953 let mut chunk_idx = 0;
954 let mut cur_idx = 0;
955 for val in 0..total_size {
956 if cur_idx >= chunk_sizes[chunk_idx] {
957 cur_idx = 0;
958 chunk_idx += 1;
959 }
960 assert_eq!(
961 new_chunks[chunk_idx]
962 .column_at(0)
963 .as_int32()
964 .value_at(cur_idx)
965 .unwrap(),
966 val as i32
967 );
968 cur_idx += 1;
969 }
970 };
971
972 test_case(0, 0, 1);
973 test_case(0, 10, 1);
974 test_case(10, 0, 1);
975 test_case(1, 1, 6);
976 test_case(1, 10, 11);
977 test_case(2, 3, 6);
978 test_case(5, 5, 6);
979 test_case(10, 10, 7);
980 }
981
982 #[test]
983 fn test_chunk_iter() {
984 let num_of_columns: usize = 2;
985 let length = 5;
986 let mut columns = vec![];
987 for i in 0..num_of_columns {
988 let mut builder = PrimitiveArrayBuilder::<i32>::new(length);
989 for _ in 0..length {
990 builder.append(Some(i as i32));
991 }
992 let arr = builder.finish();
993 columns.push(Arc::new(arr.into()))
994 }
995 let chunk: DataChunk = DataChunk::new(columns, length);
996 for row in chunk.rows() {
997 for i in 0..num_of_columns {
998 let val = row.datum_at(i).unwrap();
999 assert_eq!(val.into_int32(), i as i32);
1000 }
1001 }
1002 }
1003
1004 #[test]
1005 fn test_to_pretty_string() {
1006 let chunk = DataChunk::new(
1007 vec![
1008 Arc::new(I64Array::from_iter([1, 2, 3, 4]).into()),
1009 Arc::new(I64Array::from_iter([Some(6), None, Some(7), None]).into()),
1010 ],
1011 4,
1012 );
1013 assert_eq!(
1014 chunk.to_pretty().to_string(),
1015 "\
1016+---+---+
1017| 1 | 6 |
1018| 2 | |
1019| 3 | 7 |
1020| 4 | |
1021+---+---+"
1022 );
1023 }
1024
1025 #[test]
1026 fn test_no_column_chunk() {
1027 let chunk = DataChunk::new_dummy(10);
1028 assert_eq!(chunk.rows().count(), 10);
1029
1030 let chunk_after_serde = DataChunk::from_protobuf(&chunk.to_protobuf()).unwrap();
1031 assert_eq!(chunk_after_serde.rows().count(), 10);
1032 assert_eq!(chunk_after_serde.cardinality(), 10);
1033 }
1034
1035 #[test]
1036 fn reorder_columns() {
1037 let chunk = DataChunk::from_pretty(
1038 "I I I
1039 2 5 1
1040 4 9 2
1041 6 9 3",
1042 );
1043 assert_eq!(
1044 chunk.project(&[2, 1, 0]),
1045 DataChunk::from_pretty(
1046 "I I I
1047 1 5 2
1048 2 9 4
1049 3 9 6",
1050 )
1051 );
1052 assert_eq!(
1053 chunk.project(&[2, 0]),
1054 DataChunk::from_pretty(
1055 "I I
1056 1 2
1057 2 4
1058 3 6",
1059 )
1060 );
1061 assert_eq!(chunk.project(&[0, 1, 2]), chunk);
1062 assert_eq!(chunk.project(&[]).cardinality(), 3);
1063 }
1064
1065 #[test]
1066 fn test_chunk_estimated_size() {
1067 assert_eq!(
1068 72,
1069 DataChunk::from_pretty(
1070 "I I I
1071 1 5 2
1072 2 9 4
1073 3 9 6",
1074 )
1075 .estimated_heap_size()
1076 );
1077 assert_eq!(
1078 48,
1079 DataChunk::from_pretty(
1080 "I I
1081 1 2
1082 2 4
1083 3 6",
1084 )
1085 .estimated_heap_size()
1086 );
1087 }
1088}