use std::iter::FusedIterator;
use std::mem::swap;
use super::iter_util::ZipEqDebug;
use crate::array::{ArrayBuilderImpl, ArrayImpl, DataChunk};
use crate::row::Row;
use crate::types::{DataType, ToDatumRef};
pub struct SlicedDataChunk {
data_chunk: DataChunk,
offset: usize,
}
pub struct DataChunkBuilder {
data_types: Vec<DataType>,
batch_size: usize,
array_builders: Vec<ArrayBuilderImpl>,
buffered_count: usize,
}
impl DataChunkBuilder {
pub fn new(data_types: Vec<DataType>, batch_size: usize) -> Self {
assert!(batch_size > 0);
Self {
data_types,
batch_size,
array_builders: vec![],
buffered_count: 0,
}
}
pub fn batch_size(&self) -> usize {
self.batch_size
}
fn ensure_builders(&mut self) {
if self.array_builders.len() != self.data_types.len() {
self.array_builders = self
.data_types
.iter()
.map(|data_type| data_type.create_array_builder(self.batch_size))
.collect::<Vec<ArrayBuilderImpl>>();
assert!(self.buffered_count == 0);
}
}
#[must_use]
fn append_chunk_inner(
&mut self,
input_chunk: SlicedDataChunk,
) -> (Option<SlicedDataChunk>, Option<DataChunk>) {
self.ensure_builders();
let mut new_return_offset = input_chunk.offset;
let vis = input_chunk.data_chunk.visibility();
if !vis.all() {
for vis in vis.iter().skip(input_chunk.offset) {
new_return_offset += 1;
if !vis {
continue;
}
self.append_one_row_internal(&input_chunk.data_chunk, new_return_offset - 1);
if self.buffered_count >= self.batch_size {
break;
}
}
} else {
let num_rows_to_append = std::cmp::min(
self.batch_size - self.buffered_count,
input_chunk.data_chunk.capacity() - input_chunk.offset,
);
let end_offset = input_chunk.offset + num_rows_to_append;
for input_row_idx in input_chunk.offset..end_offset {
new_return_offset += 1;
self.append_one_row_internal(&input_chunk.data_chunk, input_row_idx)
}
};
assert!(self.buffered_count <= self.batch_size);
let returned_input_chunk = if input_chunk.data_chunk.capacity() > new_return_offset {
Some(input_chunk.with_new_offset_checked(new_return_offset))
} else {
None
};
let output_chunk = if self.buffered_count == self.batch_size {
Some(self.build_data_chunk())
} else {
None
};
(returned_input_chunk, output_chunk)
}
#[must_use]
pub fn append_chunk(&mut self, data_chunk: DataChunk) -> AppendDataChunk<'_> {
AppendDataChunk {
builder: self,
remaining: (data_chunk.capacity() > 0) .then(|| SlicedDataChunk::new_checked(data_chunk)),
}
}
#[must_use]
pub fn consume_all(&mut self) -> Option<DataChunk> {
if self.buffered_count > 0 {
Some(self.build_data_chunk())
} else {
None
}
}
pub fn finish(mut self) -> DataChunk {
self.build_data_chunk()
}
fn append_one_row_internal(&mut self, data_chunk: &DataChunk, row_idx: usize) {
self.do_append_one_row_from_datums(data_chunk.row_at(row_idx).0.iter());
}
fn do_append_one_row_from_datums(&mut self, datums: impl Iterator<Item = impl ToDatumRef>) {
for (array_builder, datum) in self.array_builders.iter_mut().zip_eq_debug(datums) {
array_builder.append(datum);
}
self.buffered_count += 1;
}
#[must_use]
pub fn append_one_row(&mut self, row: impl Row) -> Option<DataChunk> {
self.append_one_row_no_finish(row);
if self.buffered_count == self.batch_size {
Some(self.build_data_chunk())
} else {
None
}
}
fn append_one_row_no_finish(&mut self, row: impl Row) {
assert!(self.buffered_count < self.batch_size);
self.ensure_builders();
self.do_append_one_row_from_datums(row.iter());
}
#[must_use]
pub fn append_one_row_from_array_elements<'a, I1, I2>(
&mut self,
left_arrays: I1,
left_row_id: usize,
right_arrays: I2,
right_row_id: usize,
) -> Option<DataChunk>
where
I1: Iterator<Item = &'a ArrayImpl>,
I2: Iterator<Item = &'a ArrayImpl>,
{
assert!(self.buffered_count < self.batch_size);
self.ensure_builders();
for (array_builder, (array, row_id)) in self.array_builders.iter_mut().zip_eq_debug(
left_arrays
.map(|array| (array, left_row_id))
.chain(right_arrays.map(|array| (array, right_row_id))),
) {
array_builder.append_array_element(array, row_id)
}
self.buffered_count += 1;
if self.buffered_count == self.batch_size {
Some(self.build_data_chunk())
} else {
None
}
}
fn build_data_chunk(&mut self) -> DataChunk {
let mut finished_array_builders = vec![];
swap(&mut finished_array_builders, &mut self.array_builders);
let cardinality = self.buffered_count;
self.buffered_count = 0;
let columns: Vec<_> = finished_array_builders
.into_iter()
.map(|builder| builder.finish().into())
.collect();
DataChunk::new(columns, cardinality)
}
pub fn buffered_count(&self) -> usize {
self.buffered_count
}
pub fn can_append(&self, count: usize) -> bool {
self.buffered_count + count <= self.batch_size
}
pub fn num_columns(&self) -> usize {
self.data_types.len()
}
pub fn data_types(&self) -> Vec<DataType> {
self.data_types.clone()
}
pub fn is_empty(&self) -> bool {
self.buffered_count == 0
}
pub fn clear(&mut self) {
if !self.is_empty() {
self.array_builders.clear()
}
self.buffered_count = 0;
}
}
impl Drop for DataChunkBuilder {
fn drop(&mut self) {
if self.buffered_count != 0 {
tracing::warn!(
remaining = self.buffered_count,
"dropping non-empty data chunk builder"
);
}
}
}
pub struct AppendDataChunk<'a> {
builder: &'a mut DataChunkBuilder,
remaining: Option<SlicedDataChunk>,
}
impl Iterator for AppendDataChunk<'_> {
type Item = DataChunk;
fn next(&mut self) -> Option<Self::Item> {
let (remaining, output) = self.builder.append_chunk_inner(self.remaining.take()?);
self.remaining = remaining;
output
}
}
impl FusedIterator for AppendDataChunk<'_> {}
impl Drop for AppendDataChunk<'_> {
fn drop(&mut self) {
if self.remaining.is_some() {
tracing::warn!("dropping `AppendDataChunk` without exhausting it");
}
}
}
impl SlicedDataChunk {
pub fn new_checked(data_chunk: DataChunk) -> Self {
SlicedDataChunk::with_offset_checked(data_chunk, 0)
}
pub fn with_offset_checked(data_chunk: DataChunk, offset: usize) -> Self {
assert!(
offset < data_chunk.capacity(),
"offset {}, data_chunk capacity {}",
offset,
data_chunk.capacity()
);
Self { data_chunk, offset }
}
pub fn with_new_offset_checked(self, new_offset: usize) -> Self {
SlicedDataChunk::with_offset_checked(self.data_chunk, new_offset)
}
}
#[cfg(test)]
mod tests {
use itertools::Itertools;
use crate::array::DataChunk;
use crate::test_prelude::DataChunkTestExt;
use crate::types::{DataType, ScalarImpl};
use crate::util::chunk_coalesce::{DataChunkBuilder, SlicedDataChunk};
#[test]
fn test_append_chunk() {
let mut builder = DataChunkBuilder::new(vec![DataType::Int32, DataType::Int64], 3);
let input = SlicedDataChunk::new_checked(DataChunk::from_pretty(
"i I
3 .
. 7",
));
let (returned_input, output) = builder.append_chunk_inner(input);
assert!(returned_input.is_none());
assert!(output.is_none());
let input = SlicedDataChunk::new_checked(DataChunk::from_pretty(
"i I
3 .
. 7
4 8
. 9",
));
let (returned_input, output) = builder.append_chunk_inner(input);
assert_eq!(Some(1), returned_input.as_ref().map(|c| c.offset));
assert_eq!(Some(3), output.as_ref().map(DataChunk::cardinality));
assert_eq!(Some(3), output.as_ref().map(DataChunk::capacity));
assert!(output.unwrap().is_compacted());
let (returned_input, output) = builder.append_chunk_inner(returned_input.unwrap());
assert!(returned_input.is_none());
assert_eq!(Some(3), output.as_ref().map(DataChunk::cardinality));
assert_eq!(Some(3), output.as_ref().map(DataChunk::capacity));
assert!(output.unwrap().is_compacted());
}
#[test]
fn test_append_chunk_with_bitmap() {
let mut builder = DataChunkBuilder::new(vec![DataType::Int32, DataType::Int64], 3);
let input = SlicedDataChunk::new_checked(DataChunk::from_pretty(
"i I
3 .
. 7 D",
));
let (returned_input, output) = builder.append_chunk_inner(input);
assert!(returned_input.is_none());
assert!(output.is_none());
assert_eq!(1, builder.buffered_count());
let input = SlicedDataChunk::new_checked(DataChunk::from_pretty(
"i I
3 . D
. 7
4 8
. 9 D",
));
let (returned_input, output) = builder.append_chunk_inner(input);
assert_eq!(Some(3), returned_input.as_ref().map(|c| c.offset));
assert_eq!(Some(3), output.as_ref().map(DataChunk::cardinality));
assert_eq!(Some(3), output.as_ref().map(DataChunk::capacity));
assert!(output.unwrap().is_compacted());
assert_eq!(0, builder.buffered_count());
let (returned_input, output) = builder.append_chunk_inner(returned_input.unwrap());
assert!(returned_input.is_none());
assert!(output.is_none());
assert_eq!(0, builder.buffered_count());
}
#[test]
fn test_append_chunk_iter() {
let mut builder = DataChunkBuilder::new(vec![DataType::Int32, DataType::Int64], 3);
let input = DataChunk::from_pretty(
"i I
3 .
. 7",
);
let outputs = builder.append_chunk(input).collect_vec();
assert!(outputs.is_empty());
let input = DataChunk::from_pretty(
"i I
3 .
. 7
4 8
. 9",
);
let [output_1, output_2]: [_; 2] = builder
.append_chunk(input)
.collect_vec()
.try_into()
.unwrap();
for output in &[output_1, output_2] {
assert_eq!(3, output.cardinality());
assert_eq!(3, output.capacity());
assert!(output.is_compacted());
}
}
#[test]
fn test_consume_all() {
let mut builder = DataChunkBuilder::new(vec![DataType::Int32, DataType::Int64], 3);
assert!(builder.consume_all().is_none());
let input = SlicedDataChunk::new_checked(DataChunk::from_pretty(
"i I
3 .
. 7",
));
let (returned_input, output) = builder.append_chunk_inner(input);
assert!(returned_input.is_none());
assert!(output.is_none());
let output = builder.consume_all().expect("Failed to consume all!");
assert_eq!(2, output.cardinality());
assert_eq!(2, output.capacity());
assert!(output.is_compacted());
}
#[test]
fn test_append_one_row_from_array_elements() {
let mut builder = DataChunkBuilder::new(vec![DataType::Int32, DataType::Int64], 3);
assert!(builder.consume_all().is_none());
let mut left_array_builder = DataType::Int32.create_array_builder(5);
for v in [1, 2, 3, 4, 5] {
left_array_builder.append(Some(ScalarImpl::Int32(v)));
}
let left_arrays = [left_array_builder.finish()];
let mut right_array_builder = DataType::Int64.create_array_builder(5);
for v in [5, 4, 3, 2, 1] {
right_array_builder.append(Some(ScalarImpl::Int64(v)));
}
let right_arrays = [right_array_builder.finish()];
let mut output_chunks = Vec::new();
for i in 0..5 {
if let Some(chunk) = builder.append_one_row_from_array_elements(
left_arrays.iter(),
i,
right_arrays.iter(),
i,
) {
output_chunks.push(chunk)
}
}
if let Some(chunk) = builder.consume_all() {
output_chunks.push(chunk)
}
assert_eq!(
output_chunks,
vec![
DataChunk::from_pretty(
"i I
1 5
2 4
3 3"
),
DataChunk::from_pretty(
"i I
4 2
5 1"
),
]
)
}
}