risingwave_common/util/
chunk_coalesce.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::iter::FusedIterator;
16use std::mem::swap;
17
18use super::iter_util::ZipEqDebug;
19use crate::array::{ArrayBuilderImpl, ArrayImpl, DataChunk};
20use crate::row::Row;
21use crate::types::{DataType, ToDatumRef};
22
23/// A [`SlicedDataChunk`] is a [`DataChunk`] with offset.
24pub struct SlicedDataChunk {
25    data_chunk: DataChunk,
26    offset: usize,
27}
28
29/// Used as a buffer for accumulating rows.
30pub struct DataChunkBuilder {
31    /// Data types for build array
32    data_types: Vec<DataType>,
33    batch_size: usize,
34
35    /// Buffers storing current data
36    array_builders: Vec<ArrayBuilderImpl>,
37    buffered_count: usize,
38}
39
40impl DataChunkBuilder {
41    pub fn new(data_types: Vec<DataType>, batch_size: usize) -> Self {
42        assert!(batch_size > 0);
43
44        Self {
45            data_types,
46            batch_size,
47            array_builders: vec![],
48            buffered_count: 0,
49        }
50    }
51
52    pub fn batch_size(&self) -> usize {
53        self.batch_size
54    }
55
56    /// Lazily create the array builders if absent
57    fn ensure_builders(&mut self) {
58        if self.array_builders.len() != self.data_types.len() {
59            self.array_builders = self
60                .data_types
61                .iter()
62                .map(|data_type| data_type.create_array_builder(self.batch_size))
63                .collect::<Vec<ArrayBuilderImpl>>();
64
65            assert!(self.buffered_count == 0);
66        }
67    }
68
69    /// Returns not consumed input chunked data as sliced data chunk, and a data chunk of
70    /// `batch_size`.
71    ///
72    /// If `input_chunk` is not totally consumed, it's returned with a new offset, which is equal to
73    /// `old_offset + consumed_rows`. Otherwise the first value is `None`.
74    ///
75    /// If number of `batch_size` rows reached, it's returned as the second value of tuple.
76    /// Otherwise it's `None`.
77    #[must_use]
78    fn append_chunk_inner(
79        &mut self,
80        input_chunk: SlicedDataChunk,
81    ) -> (Option<SlicedDataChunk>, Option<DataChunk>) {
82        self.ensure_builders();
83
84        let mut new_return_offset = input_chunk.offset;
85        let vis = input_chunk.data_chunk.visibility();
86        if !vis.all() {
87            for vis in vis.iter().skip(input_chunk.offset) {
88                new_return_offset += 1;
89                if !vis {
90                    continue;
91                }
92
93                self.append_one_row_internal(&input_chunk.data_chunk, new_return_offset - 1);
94                if self.buffered_count >= self.batch_size {
95                    break;
96                }
97            }
98        } else {
99            let num_rows_to_append = std::cmp::min(
100                self.batch_size - self.buffered_count,
101                input_chunk.data_chunk.capacity() - input_chunk.offset,
102            );
103            let end_offset = input_chunk.offset + num_rows_to_append;
104            for input_row_idx in input_chunk.offset..end_offset {
105                new_return_offset += 1;
106                self.append_one_row_internal(&input_chunk.data_chunk, input_row_idx)
107            }
108        };
109
110        assert!(self.buffered_count <= self.batch_size);
111
112        let returned_input_chunk = if input_chunk.data_chunk.capacity() > new_return_offset {
113            Some(input_chunk.with_new_offset_checked(new_return_offset))
114        } else {
115            None
116        };
117
118        let output_chunk = if self.buffered_count == self.batch_size {
119            Some(self.build_data_chunk())
120        } else {
121            None
122        };
123
124        (returned_input_chunk, output_chunk)
125    }
126
127    /// Returns an iterator that yields data chunks during appending the whole input data chunk. The
128    /// iterator must be fully consumed to ensure all data is appended.
129    #[must_use]
130    pub fn append_chunk(&mut self, data_chunk: DataChunk) -> AppendDataChunk<'_> {
131        AppendDataChunk {
132            builder: self,
133            remaining: (data_chunk.capacity() > 0) // defensive check for empty chunk
134                .then(|| SlicedDataChunk::new_checked(data_chunk)),
135        }
136    }
137
138    /// Returns all data in current buffer.
139    ///
140    /// If `buffered_count` is 0, `None` is returned.
141    #[must_use]
142    pub fn consume_all(&mut self) -> Option<DataChunk> {
143        if self.buffered_count > 0 {
144            Some(self.build_data_chunk())
145        } else {
146            None
147        }
148    }
149
150    /// Build a data chunk from the current buffer.
151    pub fn finish(mut self) -> DataChunk {
152        self.build_data_chunk()
153    }
154
155    fn append_one_row_internal(&mut self, data_chunk: &DataChunk, row_idx: usize) {
156        self.do_append_one_row_from_datums(data_chunk.row_at(row_idx).0.iter());
157    }
158
159    fn do_append_one_row_from_datums(&mut self, datums: impl Iterator<Item = impl ToDatumRef>) {
160        for (array_builder, datum) in self.array_builders.iter_mut().zip_eq_debug(datums) {
161            array_builder.append(datum);
162        }
163        self.buffered_count += 1;
164    }
165
166    /// Append one row from the given [`Row`].
167    /// Return a data chunk if the buffer is full after append one row. Otherwise `None`.
168    #[must_use]
169    pub fn append_one_row(&mut self, row: impl Row) -> Option<DataChunk> {
170        self.append_one_row_no_finish(row);
171        if self.buffered_count == self.batch_size {
172            Some(self.build_data_chunk())
173        } else {
174            None
175        }
176    }
177
178    fn append_one_row_no_finish(&mut self, row: impl Row) {
179        assert!(self.buffered_count < self.batch_size);
180        self.ensure_builders();
181        self.do_append_one_row_from_datums(row.iter());
182    }
183
184    /// Append one row from the given two arrays.
185    /// Return a data chunk if the buffer is full after append one row. Otherwise `None`.
186    #[must_use]
187    pub fn append_one_row_from_array_elements<'a, I1, I2>(
188        &mut self,
189        left_arrays: I1,
190        left_row_id: usize,
191        right_arrays: I2,
192        right_row_id: usize,
193    ) -> Option<DataChunk>
194    where
195        I1: Iterator<Item = &'a ArrayImpl>,
196        I2: Iterator<Item = &'a ArrayImpl>,
197    {
198        assert!(self.buffered_count < self.batch_size);
199        self.ensure_builders();
200
201        for (array_builder, (array, row_id)) in self.array_builders.iter_mut().zip_eq_debug(
202            left_arrays
203                .map(|array| (array, left_row_id))
204                .chain(right_arrays.map(|array| (array, right_row_id))),
205        ) {
206            array_builder.append_array_element(array, row_id)
207        }
208
209        self.buffered_count += 1;
210
211        if self.buffered_count == self.batch_size {
212            Some(self.build_data_chunk())
213        } else {
214            None
215        }
216    }
217
218    fn build_data_chunk(&mut self) -> DataChunk {
219        let mut finished_array_builders = vec![];
220        swap(&mut finished_array_builders, &mut self.array_builders);
221        let cardinality = self.buffered_count;
222        self.buffered_count = 0;
223
224        let columns: Vec<_> = finished_array_builders
225            .into_iter()
226            .map(|builder| builder.finish().into())
227            .collect();
228        DataChunk::new(columns, cardinality)
229    }
230
231    pub fn buffered_count(&self) -> usize {
232        self.buffered_count
233    }
234
235    pub fn can_append_update(&self) -> bool {
236        self.buffered_count + 2 <= self.batch_size
237    }
238
239    pub fn num_columns(&self) -> usize {
240        self.data_types.len()
241    }
242
243    pub fn data_types(&self) -> Vec<DataType> {
244        self.data_types.clone()
245    }
246
247    pub fn is_empty(&self) -> bool {
248        self.buffered_count == 0
249    }
250
251    pub fn clear(&mut self) {
252        if !self.is_empty() {
253            self.array_builders.clear()
254        }
255        self.buffered_count = 0;
256    }
257}
258
259impl Drop for DataChunkBuilder {
260    fn drop(&mut self) {
261        // Possible to fail when async task gets cancelled.
262        if self.buffered_count != 0 {
263            tracing::warn!(
264                remaining = self.buffered_count,
265                "dropping non-empty data chunk builder"
266            );
267        }
268    }
269}
270
271/// The iterator that yields data chunks during appending a data chunk to a [`DataChunkBuilder`].
272pub struct AppendDataChunk<'a> {
273    builder: &'a mut DataChunkBuilder,
274    remaining: Option<SlicedDataChunk>,
275}
276
277impl Iterator for AppendDataChunk<'_> {
278    type Item = DataChunk;
279
280    fn next(&mut self) -> Option<Self::Item> {
281        let (remaining, output) = self.builder.append_chunk_inner(self.remaining.take()?);
282        self.remaining = remaining;
283        output
284    }
285}
286
287impl FusedIterator for AppendDataChunk<'_> {}
288
289impl Drop for AppendDataChunk<'_> {
290    fn drop(&mut self) {
291        // Possible to fail when async task gets cancelled.
292        if self.remaining.is_some() {
293            tracing::warn!("dropping `AppendDataChunk` without exhausting it");
294        }
295    }
296}
297
298impl SlicedDataChunk {
299    pub fn new_checked(data_chunk: DataChunk) -> Self {
300        SlicedDataChunk::with_offset_checked(data_chunk, 0)
301    }
302
303    pub fn with_offset_checked(data_chunk: DataChunk, offset: usize) -> Self {
304        assert!(
305            offset < data_chunk.capacity(),
306            "offset {}, data_chunk capacity {}",
307            offset,
308            data_chunk.capacity()
309        );
310        Self { data_chunk, offset }
311    }
312
313    pub fn with_new_offset_checked(self, new_offset: usize) -> Self {
314        SlicedDataChunk::with_offset_checked(self.data_chunk, new_offset)
315    }
316}
317
318#[cfg(test)]
319mod tests {
320    use itertools::Itertools;
321
322    use crate::array::DataChunk;
323    use crate::test_prelude::DataChunkTestExt;
324    use crate::types::{DataType, ScalarImpl};
325    use crate::util::chunk_coalesce::{DataChunkBuilder, SlicedDataChunk};
326
327    #[test]
328    fn test_append_chunk() {
329        let mut builder = DataChunkBuilder::new(vec![DataType::Int32, DataType::Int64], 3);
330
331        // Append a chunk with 2 rows
332        let input = SlicedDataChunk::new_checked(DataChunk::from_pretty(
333            "i I
334             3 .
335             . 7",
336        ));
337
338        let (returned_input, output) = builder.append_chunk_inner(input);
339        assert!(returned_input.is_none());
340        assert!(output.is_none());
341
342        // Append a chunk with 4 rows
343        let input = SlicedDataChunk::new_checked(DataChunk::from_pretty(
344            "i I
345             3 .
346             . 7
347             4 8
348             . 9",
349        ));
350        let (returned_input, output) = builder.append_chunk_inner(input);
351        assert_eq!(Some(1), returned_input.as_ref().map(|c| c.offset));
352        assert_eq!(Some(3), output.as_ref().map(DataChunk::cardinality));
353        assert_eq!(Some(3), output.as_ref().map(DataChunk::capacity));
354        assert!(output.unwrap().is_compacted());
355
356        // Append last input
357        let (returned_input, output) = builder.append_chunk_inner(returned_input.unwrap());
358        assert!(returned_input.is_none());
359        assert_eq!(Some(3), output.as_ref().map(DataChunk::cardinality));
360        assert_eq!(Some(3), output.as_ref().map(DataChunk::capacity));
361        assert!(output.unwrap().is_compacted());
362    }
363
364    #[test]
365    fn test_append_chunk_with_bitmap() {
366        let mut builder = DataChunkBuilder::new(vec![DataType::Int32, DataType::Int64], 3);
367
368        // Append a chunk with 2 rows
369        let input = SlicedDataChunk::new_checked(DataChunk::from_pretty(
370            "i I
371             3 .
372             . 7 D",
373        ));
374
375        let (returned_input, output) = builder.append_chunk_inner(input);
376        assert!(returned_input.is_none());
377        assert!(output.is_none());
378        assert_eq!(1, builder.buffered_count());
379
380        // Append a chunk with 4 rows
381        let input = SlicedDataChunk::new_checked(DataChunk::from_pretty(
382            "i I
383             3 . D
384             . 7
385             4 8
386             . 9 D",
387        ));
388        let (returned_input, output) = builder.append_chunk_inner(input);
389        assert_eq!(Some(3), returned_input.as_ref().map(|c| c.offset));
390        assert_eq!(Some(3), output.as_ref().map(DataChunk::cardinality));
391        assert_eq!(Some(3), output.as_ref().map(DataChunk::capacity));
392        assert!(output.unwrap().is_compacted());
393        assert_eq!(0, builder.buffered_count());
394
395        // Append last input
396        let (returned_input, output) = builder.append_chunk_inner(returned_input.unwrap());
397        assert!(returned_input.is_none());
398        assert!(output.is_none());
399        assert_eq!(0, builder.buffered_count());
400    }
401
402    #[test]
403    fn test_append_chunk_iter() {
404        let mut builder = DataChunkBuilder::new(vec![DataType::Int32, DataType::Int64], 3);
405
406        // Append a chunk with 2 rows
407        let input = DataChunk::from_pretty(
408            "i I
409             3 .
410             . 7",
411        );
412
413        let outputs = builder.append_chunk(input).collect_vec();
414        assert!(outputs.is_empty());
415
416        // Append a chunk with 4 rows
417        let input = DataChunk::from_pretty(
418            "i I
419             3 .
420             . 7
421             4 8
422             . 9",
423        );
424
425        let [output_1, output_2]: [_; 2] = builder
426            .append_chunk(input)
427            .collect_vec()
428            .try_into()
429            .unwrap();
430
431        for output in &[output_1, output_2] {
432            assert_eq!(3, output.cardinality());
433            assert_eq!(3, output.capacity());
434            assert!(output.is_compacted());
435        }
436    }
437
438    #[test]
439    fn test_consume_all() {
440        let mut builder = DataChunkBuilder::new(vec![DataType::Int32, DataType::Int64], 3);
441
442        // It should return `None` when builder is empty
443        assert!(builder.consume_all().is_none());
444
445        // Append a chunk with 2 rows
446        let input = SlicedDataChunk::new_checked(DataChunk::from_pretty(
447            "i I
448             3 .
449             . 7",
450        ));
451
452        let (returned_input, output) = builder.append_chunk_inner(input);
453        assert!(returned_input.is_none());
454        assert!(output.is_none());
455
456        let output = builder.consume_all().expect("Failed to consume all!");
457        assert_eq!(2, output.cardinality());
458        assert_eq!(2, output.capacity());
459        assert!(output.is_compacted());
460    }
461
462    #[test]
463    fn test_append_one_row_from_array_elements() {
464        let mut builder = DataChunkBuilder::new(vec![DataType::Int32, DataType::Int64], 3);
465
466        assert!(builder.consume_all().is_none());
467
468        let mut left_array_builder = DataType::Int32.create_array_builder(5);
469        for v in [1, 2, 3, 4, 5] {
470            left_array_builder.append(Some(ScalarImpl::Int32(v)));
471        }
472        let left_arrays = [left_array_builder.finish()];
473
474        let mut right_array_builder = DataType::Int64.create_array_builder(5);
475        for v in [5, 4, 3, 2, 1] {
476            right_array_builder.append(Some(ScalarImpl::Int64(v)));
477        }
478        let right_arrays = [right_array_builder.finish()];
479
480        let mut output_chunks = Vec::new();
481
482        for i in 0..5 {
483            if let Some(chunk) = builder.append_one_row_from_array_elements(
484                left_arrays.iter(),
485                i,
486                right_arrays.iter(),
487                i,
488            ) {
489                output_chunks.push(chunk)
490            }
491        }
492
493        if let Some(chunk) = builder.consume_all() {
494            output_chunks.push(chunk)
495        }
496
497        assert_eq!(
498            output_chunks,
499            vec![
500                DataChunk::from_pretty(
501                    "i I
502                    1 5
503                    2 4
504                    3 3"
505                ),
506                DataChunk::from_pretty(
507                    "i I
508                    4 2
509                    5 1"
510                ),
511            ]
512        )
513    }
514}