risingwave_common/array/
stream_chunk_builder.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::array::stream_record::Record;
16use crate::array::{ArrayBuilderImpl, Op, StreamChunk};
17use crate::bitmap::BitmapBuilder;
18use crate::row::Row;
19use crate::types::{DataType, DatumRef};
20use crate::util::iter_util::ZipEqFast;
21
22/// Build stream chunks with fixed chunk size from rows or records.
23pub struct StreamChunkBuilder {
24    /// operations in the data chunk to build
25    ops: Vec<Op>,
26
27    /// arrays in the data chunk to build
28    column_builders: Vec<ArrayBuilderImpl>,
29
30    /// Visibility
31    vis_builder: BitmapBuilder,
32
33    /// Data types of columns
34    data_types: Vec<DataType>,
35
36    /// Max number of rows in a chunk. When it's `Some(n)`, the chunk builder will, if necessary,
37    /// yield a chunk of which the size is strictly less than or equal to `n` when appending records.
38    /// When it's `None`, the chunk builder will yield chunks only when `take` is called.
39    max_chunk_size: Option<usize>,
40
41    /// The initial capacity of `ops` and `ArrayBuilder`s.
42    initial_capacity: usize,
43
44    /// Number of currently pending rows.
45    size: usize,
46}
47
48impl Drop for StreamChunkBuilder {
49    fn drop(&mut self) {
50        // Possible to fail when async task gets cancelled.
51        if self.size != 0 {
52            tracing::warn!(
53                remaining = self.size,
54                "dropping non-empty stream chunk builder"
55            );
56        }
57    }
58}
59
60const MAX_INITIAL_CAPACITY: usize = 4096;
61const DEFAULT_INITIAL_CAPACITY: usize = 64;
62
63impl StreamChunkBuilder {
64    /// Create a new `StreamChunkBuilder` with a fixed max chunk size.
65    /// Note that in the case of ending with `Update`, the builder may yield a chunk with size
66    /// `max_chunk_size + 1`.
67    pub fn new(max_chunk_size: usize, data_types: Vec<DataType>) -> Self {
68        assert!(max_chunk_size > 0);
69
70        let initial_capacity = max_chunk_size.min(MAX_INITIAL_CAPACITY);
71
72        let ops = Vec::with_capacity(initial_capacity);
73        let column_builders = data_types
74            .iter()
75            .map(|datatype| datatype.create_array_builder(initial_capacity))
76            .collect();
77        let vis_builder = BitmapBuilder::with_capacity(initial_capacity);
78        Self {
79            ops,
80            column_builders,
81            data_types,
82            vis_builder,
83            max_chunk_size: Some(max_chunk_size),
84            initial_capacity,
85            size: 0,
86        }
87    }
88
89    /// Create a new `StreamChunkBuilder` with unlimited chunk size.
90    /// The builder will only yield chunks when `take` is called.
91    pub fn unlimited(data_types: Vec<DataType>, initial_capacity: Option<usize>) -> Self {
92        let initial_capacity = initial_capacity.unwrap_or(DEFAULT_INITIAL_CAPACITY);
93        Self {
94            ops: Vec::with_capacity(initial_capacity),
95            column_builders: data_types
96                .iter()
97                .map(|datatype| datatype.create_array_builder(initial_capacity))
98                .collect(),
99            data_types,
100            vis_builder: BitmapBuilder::default(),
101            max_chunk_size: None,
102            initial_capacity,
103            size: 0,
104        }
105    }
106
107    pub fn build_empty(data_types: Vec<DataType>) -> StreamChunk {
108        Self::new(1, data_types).take_inner()
109    }
110
111    /// Get the current number of rows in the builder.
112    pub fn size(&self) -> usize {
113        self.size
114    }
115
116    /// Append an iterator of output index and datum to the builder, return a chunk if the builder
117    /// is full.
118    ///
119    /// Note: the caller must ensure that each column occurs exactly once in `iter`.
120    #[must_use]
121    pub fn append_iter<'a>(
122        &mut self,
123        op: Op,
124        iter: impl IntoIterator<Item = (usize, DatumRef<'a>)>,
125    ) -> Option<StreamChunk> {
126        self.append_iter_inner::<true>(op, iter)
127    }
128
129    /// Append a row to the builder, return a chunk if the builder is full.
130    #[must_use]
131    pub fn append_row(&mut self, op: Op, row: impl Row) -> Option<StreamChunk> {
132        self.append_iter_inner::<true>(op, row.iter().enumerate())
133    }
134
135    /// Append an invisible row to the builder, return a chunk if the builder is full.
136    #[must_use]
137    pub fn append_row_invisible(&mut self, op: Op, row: impl Row) -> Option<StreamChunk> {
138        self.append_iter_inner::<false>(op, row.iter().enumerate())
139    }
140
141    /// Append a record to the builder, return a chunk if the builder is full.
142    #[must_use]
143    pub fn append_record(&mut self, record: Record<impl Row>) -> Option<StreamChunk> {
144        match record {
145            Record::Insert { new_row } => {
146                self.append_iter_inner::<true>(Op::Insert, new_row.iter().enumerate())
147            }
148            Record::Delete { old_row } => {
149                self.append_iter_inner::<true>(Op::Delete, old_row.iter().enumerate())
150            }
151            Record::Update { old_row, new_row } => {
152                let none =
153                    self.append_iter_inner::<true>(Op::UpdateDelete, old_row.iter().enumerate());
154                assert!(none.is_none());
155                self.append_iter_inner::<true>(Op::UpdateInsert, new_row.iter().enumerate())
156            }
157        }
158    }
159
160    /// Take all the pending data and return a chunk. If there is no pending data, return `None`.
161    /// Note that if this is an unlimited chunk builder, the only way to get a chunk is to call
162    /// `take`.
163    #[must_use]
164    pub fn take(&mut self) -> Option<StreamChunk> {
165        if self.size == 0 {
166            return None;
167        }
168        Some(self.take_inner())
169    }
170
171    fn take_inner(&mut self) -> StreamChunk {
172        self.size = 0;
173
174        let ops = std::mem::replace(&mut self.ops, Vec::with_capacity(self.initial_capacity));
175        let columns = self
176            .column_builders
177            .iter_mut()
178            .zip_eq_fast(&self.data_types)
179            .map(|(builder, datatype)| {
180                std::mem::replace(
181                    builder,
182                    datatype.create_array_builder(self.initial_capacity),
183                )
184                .finish()
185            })
186            .map(Into::into)
187            .collect::<Vec<_>>();
188        let vis = std::mem::take(&mut self.vis_builder).finish();
189
190        StreamChunk::with_visibility(ops, columns, vis)
191    }
192
193    #[must_use]
194    fn append_iter_inner<'a, const VIS: bool>(
195        &mut self,
196        op: Op,
197        iter: impl IntoIterator<Item = (usize, DatumRef<'a>)>,
198    ) -> Option<StreamChunk> {
199        self.ops.push(op);
200        for (i, datum) in iter {
201            self.column_builders[i].append(datum);
202        }
203        self.vis_builder.append(VIS);
204        self.size += 1;
205
206        if let Some(max_chunk_size) = self.max_chunk_size {
207            if self.size == max_chunk_size && !op.is_update_delete() || self.size > max_chunk_size {
208                // Two situations here:
209                // 1. `self.size == max_chunk_size && op == Op::UpdateDelete`
210                //    We should wait for next `UpdateInsert` to join the chunk.
211                // 2. `self.size > max_chunk_size`
212                //    Here we assert that `self.size == max_chunk_size + 1`. It's possible that
213                //    the `Op` after `UpdateDelete` is not `UpdateInsert`, if something inconsistent
214                //    happens, we should still take the existing data.
215                self.take()
216            } else {
217                None
218            }
219        } else {
220            // unlimited
221            None
222        }
223    }
224}
225
226#[cfg(test)]
227mod tests {
228    use super::*;
229    use crate::array::{Datum, StreamChunkTestExt};
230    use crate::row::OwnedRow;
231
232    #[test]
233    fn test_stream_chunk_builder() {
234        let row = OwnedRow::new(vec![Datum::None, Datum::None]);
235        let mut builder = StreamChunkBuilder::new(3, vec![DataType::Int32, DataType::Int32]);
236        let res = builder.append_row(Op::Delete, row.clone());
237        assert!(res.is_none());
238        let res = builder.append_row(Op::Insert, row.clone());
239        assert!(res.is_none());
240        let res = builder.append_row(Op::Insert, row.clone());
241        assert_eq!(
242            res,
243            Some(StreamChunk::from_pretty(
244                "  i i
245                 - . .
246                 + . .
247                 + . ."
248            ))
249        );
250        let res = builder.take();
251        assert!(res.is_none());
252
253        let res = builder.append_row_invisible(Op::Delete, row.clone());
254        assert!(res.is_none());
255        let res = builder.append_iter(Op::Delete, row.clone().iter().enumerate());
256        assert!(res.is_none());
257        let res = builder.append_record(Record::Insert {
258            new_row: row.clone(),
259        });
260        assert_eq!(
261            res,
262            Some(StreamChunk::from_pretty(
263                "  i i
264                 - . . D
265                 - . .
266                 + . ."
267            ))
268        );
269
270        let res = builder.append_row(Op::UpdateDelete, row.clone());
271        assert!(res.is_none());
272        let res = builder.append_row(Op::UpdateInsert, row.clone());
273        assert!(res.is_none());
274        let res = builder.append_record(Record::Update {
275            old_row: row.clone(),
276            new_row: row.clone(),
277        });
278        assert_eq!(
279            res,
280            Some(StreamChunk::from_pretty(
281                "  i i
282                U- . .
283                U+ . .
284                U- . .
285                U+ . ."
286            ))
287        );
288        let res = builder.take();
289        assert!(res.is_none());
290    }
291
292    #[test]
293    fn test_stream_chunk_builder_with_max_size_1() {
294        let row = OwnedRow::new(vec![Datum::None, Datum::None]);
295        let mut builder = StreamChunkBuilder::new(1, vec![DataType::Int32, DataType::Int32]);
296
297        let res = builder.append_row(Op::Delete, row.clone());
298        assert_eq!(
299            res,
300            Some(StreamChunk::from_pretty(
301                "  i i
302                 - . ."
303            ))
304        );
305        let res = builder.append_row(Op::Insert, row.clone());
306        assert_eq!(
307            res,
308            Some(StreamChunk::from_pretty(
309                "  i i
310                 + . ."
311            ))
312        );
313
314        let res = builder.append_record(Record::Update {
315            old_row: row.clone(),
316            new_row: row.clone(),
317        });
318        assert_eq!(
319            res,
320            Some(StreamChunk::from_pretty(
321                "  i i
322                U- . .
323                U+ . ."
324            ))
325        );
326
327        let res = builder.append_row(Op::UpdateDelete, row.clone());
328        assert!(res.is_none());
329        let res = builder.append_row(Op::UpdateDelete, row.clone()); // note this is an inconsistency
330        assert_eq!(
331            res,
332            Some(StreamChunk::from_pretty(
333                "  i i
334                U- . .
335                U- . ."
336            ))
337        );
338    }
339
340    #[test]
341    fn test_unlimited_stream_chunk_builder() {
342        let row = OwnedRow::new(vec![Datum::None, Datum::None]);
343        let mut builder =
344            StreamChunkBuilder::unlimited(vec![DataType::Int32, DataType::Int32], None);
345
346        let res = builder.append_row(Op::Delete, row.clone());
347        assert!(res.is_none());
348        let res = builder.append_row(Op::Insert, row.clone());
349        assert!(res.is_none());
350        let res = builder.append_row(Op::UpdateDelete, row.clone());
351        assert!(res.is_none());
352        let res = builder.append_row(Op::UpdateInsert, row.clone());
353        assert!(res.is_none());
354
355        for _ in 0..2048 {
356            let res = builder.append_record(Record::Update {
357                old_row: row.clone(),
358                new_row: row.clone(),
359            });
360            assert!(res.is_none());
361        }
362
363        let res = builder.take();
364        assert_eq!(res.unwrap().capacity(), 2048 * 2 + 4);
365    }
366}