1use std::iter::FusedIterator;
16use std::mem::swap;
17
18use super::iter_util::ZipEqDebug;
19use crate::array::{ArrayBuilderImpl, ArrayImpl, DataChunk};
20use crate::row::Row;
21use crate::types::{DataType, ToDatumRef};
22
23pub struct SlicedDataChunk {
25 data_chunk: DataChunk,
26 offset: usize,
27}
28
29pub struct DataChunkBuilder {
31 data_types: Vec<DataType>,
33 batch_size: usize,
34
35 array_builders: Vec<ArrayBuilderImpl>,
37 buffered_count: usize,
38}
39
40impl DataChunkBuilder {
41 pub fn new(data_types: Vec<DataType>, batch_size: usize) -> Self {
42 assert!(batch_size > 0);
43
44 Self {
45 data_types,
46 batch_size,
47 array_builders: vec![],
48 buffered_count: 0,
49 }
50 }
51
52 pub fn batch_size(&self) -> usize {
53 self.batch_size
54 }
55
56 fn ensure_builders(&mut self) {
58 if self.array_builders.len() != self.data_types.len() {
59 self.array_builders = self
60 .data_types
61 .iter()
62 .map(|data_type| data_type.create_array_builder(self.batch_size))
63 .collect::<Vec<ArrayBuilderImpl>>();
64
65 assert!(self.buffered_count == 0);
66 }
67 }
68
69 #[must_use]
78 fn append_chunk_inner(
79 &mut self,
80 input_chunk: SlicedDataChunk,
81 ) -> (Option<SlicedDataChunk>, Option<DataChunk>) {
82 self.ensure_builders();
83
84 let mut new_return_offset = input_chunk.offset;
85 let vis = input_chunk.data_chunk.visibility();
86 if !vis.all() {
87 for vis in vis.iter().skip(input_chunk.offset) {
88 new_return_offset += 1;
89 if !vis {
90 continue;
91 }
92
93 self.append_one_row_internal(&input_chunk.data_chunk, new_return_offset - 1);
94 if self.buffered_count >= self.batch_size {
95 break;
96 }
97 }
98 } else {
99 let num_rows_to_append = std::cmp::min(
100 self.batch_size - self.buffered_count,
101 input_chunk.data_chunk.capacity() - input_chunk.offset,
102 );
103 let end_offset = input_chunk.offset + num_rows_to_append;
104 for input_row_idx in input_chunk.offset..end_offset {
105 new_return_offset += 1;
106 self.append_one_row_internal(&input_chunk.data_chunk, input_row_idx)
107 }
108 };
109
110 assert!(self.buffered_count <= self.batch_size);
111
112 let returned_input_chunk = if input_chunk.data_chunk.capacity() > new_return_offset {
113 Some(input_chunk.with_new_offset_checked(new_return_offset))
114 } else {
115 None
116 };
117
118 let output_chunk = if self.buffered_count == self.batch_size {
119 Some(self.build_data_chunk())
120 } else {
121 None
122 };
123
124 (returned_input_chunk, output_chunk)
125 }
126
127 #[must_use]
130 pub fn append_chunk(&mut self, data_chunk: DataChunk) -> AppendDataChunk<'_> {
131 AppendDataChunk {
132 builder: self,
133 remaining: (data_chunk.capacity() > 0) .then(|| SlicedDataChunk::new_checked(data_chunk)),
135 }
136 }
137
138 #[must_use]
142 pub fn consume_all(&mut self) -> Option<DataChunk> {
143 if self.buffered_count > 0 {
144 Some(self.build_data_chunk())
145 } else {
146 None
147 }
148 }
149
150 pub fn finish(mut self) -> DataChunk {
152 self.build_data_chunk()
153 }
154
155 fn append_one_row_internal(&mut self, data_chunk: &DataChunk, row_idx: usize) {
156 self.do_append_one_row_from_datums(data_chunk.row_at(row_idx).0.iter());
157 }
158
159 fn do_append_one_row_from_datums(&mut self, datums: impl Iterator<Item = impl ToDatumRef>) {
160 for (array_builder, datum) in self.array_builders.iter_mut().zip_eq_debug(datums) {
161 array_builder.append(datum);
162 }
163 self.buffered_count += 1;
164 }
165
166 #[must_use]
169 pub fn append_one_row(&mut self, row: impl Row) -> Option<DataChunk> {
170 self.append_one_row_no_finish(row);
171 if self.buffered_count == self.batch_size {
172 Some(self.build_data_chunk())
173 } else {
174 None
175 }
176 }
177
178 fn append_one_row_no_finish(&mut self, row: impl Row) {
179 assert!(self.buffered_count < self.batch_size);
180 self.ensure_builders();
181 self.do_append_one_row_from_datums(row.iter());
182 }
183
184 #[must_use]
187 pub fn append_one_row_from_array_elements<'a, I1, I2>(
188 &mut self,
189 left_arrays: I1,
190 left_row_id: usize,
191 right_arrays: I2,
192 right_row_id: usize,
193 ) -> Option<DataChunk>
194 where
195 I1: Iterator<Item = &'a ArrayImpl>,
196 I2: Iterator<Item = &'a ArrayImpl>,
197 {
198 assert!(self.buffered_count < self.batch_size);
199 self.ensure_builders();
200
201 for (array_builder, (array, row_id)) in self.array_builders.iter_mut().zip_eq_debug(
202 left_arrays
203 .map(|array| (array, left_row_id))
204 .chain(right_arrays.map(|array| (array, right_row_id))),
205 ) {
206 array_builder.append_array_element(array, row_id)
207 }
208
209 self.buffered_count += 1;
210
211 if self.buffered_count == self.batch_size {
212 Some(self.build_data_chunk())
213 } else {
214 None
215 }
216 }
217
218 fn build_data_chunk(&mut self) -> DataChunk {
219 let mut finished_array_builders = vec![];
220 swap(&mut finished_array_builders, &mut self.array_builders);
221 let cardinality = self.buffered_count;
222 self.buffered_count = 0;
223
224 let columns: Vec<_> = finished_array_builders
225 .into_iter()
226 .map(|builder| builder.finish().into())
227 .collect();
228 DataChunk::new(columns, cardinality)
229 }
230
231 pub fn buffered_count(&self) -> usize {
232 self.buffered_count
233 }
234
235 pub fn can_append_update(&self) -> bool {
236 self.buffered_count + 2 <= self.batch_size
237 }
238
239 pub fn num_columns(&self) -> usize {
240 self.data_types.len()
241 }
242
243 pub fn data_types(&self) -> Vec<DataType> {
244 self.data_types.clone()
245 }
246
247 pub fn is_empty(&self) -> bool {
248 self.buffered_count == 0
249 }
250
251 pub fn clear(&mut self) {
252 if !self.is_empty() {
253 self.array_builders.clear()
254 }
255 self.buffered_count = 0;
256 }
257}
258
259impl Drop for DataChunkBuilder {
260 fn drop(&mut self) {
261 if self.buffered_count != 0 {
263 tracing::warn!(
264 remaining = self.buffered_count,
265 "dropping non-empty data chunk builder"
266 );
267 }
268 }
269}
270
271pub struct AppendDataChunk<'a> {
273 builder: &'a mut DataChunkBuilder,
274 remaining: Option<SlicedDataChunk>,
275}
276
277impl Iterator for AppendDataChunk<'_> {
278 type Item = DataChunk;
279
280 fn next(&mut self) -> Option<Self::Item> {
281 let (remaining, output) = self.builder.append_chunk_inner(self.remaining.take()?);
282 self.remaining = remaining;
283 output
284 }
285}
286
287impl FusedIterator for AppendDataChunk<'_> {}
288
289impl Drop for AppendDataChunk<'_> {
290 fn drop(&mut self) {
291 if self.remaining.is_some() {
293 tracing::warn!("dropping `AppendDataChunk` without exhausting it");
294 }
295 }
296}
297
298impl SlicedDataChunk {
299 pub fn new_checked(data_chunk: DataChunk) -> Self {
300 SlicedDataChunk::with_offset_checked(data_chunk, 0)
301 }
302
303 pub fn with_offset_checked(data_chunk: DataChunk, offset: usize) -> Self {
304 assert!(
305 offset < data_chunk.capacity(),
306 "offset {}, data_chunk capacity {}",
307 offset,
308 data_chunk.capacity()
309 );
310 Self { data_chunk, offset }
311 }
312
313 pub fn with_new_offset_checked(self, new_offset: usize) -> Self {
314 SlicedDataChunk::with_offset_checked(self.data_chunk, new_offset)
315 }
316}
317
318#[cfg(test)]
319mod tests {
320 use itertools::Itertools;
321
322 use crate::array::DataChunk;
323 use crate::test_prelude::DataChunkTestExt;
324 use crate::types::{DataType, ScalarImpl};
325 use crate::util::chunk_coalesce::{DataChunkBuilder, SlicedDataChunk};
326
327 #[test]
328 fn test_append_chunk() {
329 let mut builder = DataChunkBuilder::new(vec![DataType::Int32, DataType::Int64], 3);
330
331 let input = SlicedDataChunk::new_checked(DataChunk::from_pretty(
333 "i I
334 3 .
335 . 7",
336 ));
337
338 let (returned_input, output) = builder.append_chunk_inner(input);
339 assert!(returned_input.is_none());
340 assert!(output.is_none());
341
342 let input = SlicedDataChunk::new_checked(DataChunk::from_pretty(
344 "i I
345 3 .
346 . 7
347 4 8
348 . 9",
349 ));
350 let (returned_input, output) = builder.append_chunk_inner(input);
351 assert_eq!(Some(1), returned_input.as_ref().map(|c| c.offset));
352 assert_eq!(Some(3), output.as_ref().map(DataChunk::cardinality));
353 assert_eq!(Some(3), output.as_ref().map(DataChunk::capacity));
354 assert!(output.unwrap().is_compacted());
355
356 let (returned_input, output) = builder.append_chunk_inner(returned_input.unwrap());
358 assert!(returned_input.is_none());
359 assert_eq!(Some(3), output.as_ref().map(DataChunk::cardinality));
360 assert_eq!(Some(3), output.as_ref().map(DataChunk::capacity));
361 assert!(output.unwrap().is_compacted());
362 }
363
364 #[test]
365 fn test_append_chunk_with_bitmap() {
366 let mut builder = DataChunkBuilder::new(vec![DataType::Int32, DataType::Int64], 3);
367
368 let input = SlicedDataChunk::new_checked(DataChunk::from_pretty(
370 "i I
371 3 .
372 . 7 D",
373 ));
374
375 let (returned_input, output) = builder.append_chunk_inner(input);
376 assert!(returned_input.is_none());
377 assert!(output.is_none());
378 assert_eq!(1, builder.buffered_count());
379
380 let input = SlicedDataChunk::new_checked(DataChunk::from_pretty(
382 "i I
383 3 . D
384 . 7
385 4 8
386 . 9 D",
387 ));
388 let (returned_input, output) = builder.append_chunk_inner(input);
389 assert_eq!(Some(3), returned_input.as_ref().map(|c| c.offset));
390 assert_eq!(Some(3), output.as_ref().map(DataChunk::cardinality));
391 assert_eq!(Some(3), output.as_ref().map(DataChunk::capacity));
392 assert!(output.unwrap().is_compacted());
393 assert_eq!(0, builder.buffered_count());
394
395 let (returned_input, output) = builder.append_chunk_inner(returned_input.unwrap());
397 assert!(returned_input.is_none());
398 assert!(output.is_none());
399 assert_eq!(0, builder.buffered_count());
400 }
401
402 #[test]
403 fn test_append_chunk_iter() {
404 let mut builder = DataChunkBuilder::new(vec![DataType::Int32, DataType::Int64], 3);
405
406 let input = DataChunk::from_pretty(
408 "i I
409 3 .
410 . 7",
411 );
412
413 let outputs = builder.append_chunk(input).collect_vec();
414 assert!(outputs.is_empty());
415
416 let input = DataChunk::from_pretty(
418 "i I
419 3 .
420 . 7
421 4 8
422 . 9",
423 );
424
425 let [output_1, output_2]: [_; 2] = builder
426 .append_chunk(input)
427 .collect_vec()
428 .try_into()
429 .unwrap();
430
431 for output in &[output_1, output_2] {
432 assert_eq!(3, output.cardinality());
433 assert_eq!(3, output.capacity());
434 assert!(output.is_compacted());
435 }
436 }
437
438 #[test]
439 fn test_consume_all() {
440 let mut builder = DataChunkBuilder::new(vec![DataType::Int32, DataType::Int64], 3);
441
442 assert!(builder.consume_all().is_none());
444
445 let input = SlicedDataChunk::new_checked(DataChunk::from_pretty(
447 "i I
448 3 .
449 . 7",
450 ));
451
452 let (returned_input, output) = builder.append_chunk_inner(input);
453 assert!(returned_input.is_none());
454 assert!(output.is_none());
455
456 let output = builder.consume_all().expect("Failed to consume all!");
457 assert_eq!(2, output.cardinality());
458 assert_eq!(2, output.capacity());
459 assert!(output.is_compacted());
460 }
461
462 #[test]
463 fn test_append_one_row_from_array_elements() {
464 let mut builder = DataChunkBuilder::new(vec![DataType::Int32, DataType::Int64], 3);
465
466 assert!(builder.consume_all().is_none());
467
468 let mut left_array_builder = DataType::Int32.create_array_builder(5);
469 for v in [1, 2, 3, 4, 5] {
470 left_array_builder.append(Some(ScalarImpl::Int32(v)));
471 }
472 let left_arrays = [left_array_builder.finish()];
473
474 let mut right_array_builder = DataType::Int64.create_array_builder(5);
475 for v in [5, 4, 3, 2, 1] {
476 right_array_builder.append(Some(ScalarImpl::Int64(v)));
477 }
478 let right_arrays = [right_array_builder.finish()];
479
480 let mut output_chunks = Vec::new();
481
482 for i in 0..5 {
483 if let Some(chunk) = builder.append_one_row_from_array_elements(
484 left_arrays.iter(),
485 i,
486 right_arrays.iter(),
487 i,
488 ) {
489 output_chunks.push(chunk)
490 }
491 }
492
493 if let Some(chunk) = builder.consume_all() {
494 output_chunks.push(chunk)
495 }
496
497 assert_eq!(
498 output_chunks,
499 vec![
500 DataChunk::from_pretty(
501 "i I
502 1 5
503 2 4
504 3 3"
505 ),
506 DataChunk::from_pretty(
507 "i I
508 4 2
509 5 1"
510 ),
511 ]
512 )
513 }
514}