risingwave_common/array/
stream_chunk_builder.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::array::stream_record::Record;
16use crate::array::{ArrayBuilderImpl, Op, StreamChunk};
17use crate::bitmap::BitmapBuilder;
18use crate::row::Row;
19use crate::types::{DataType, DatumRef};
20use crate::util::iter_util::ZipEqFast;
21
22/// Build stream chunks with fixed chunk size from rows or records.
23pub struct StreamChunkBuilder {
24    /// operations in the data chunk to build
25    ops: Vec<Op>,
26
27    /// arrays in the data chunk to build
28    column_builders: Vec<ArrayBuilderImpl>,
29
30    /// Visibility
31    vis_builder: BitmapBuilder,
32
33    /// Data types of columns
34    data_types: Vec<DataType>,
35
36    /// Max number of rows in a chunk. When it's `Some(n)`, the chunk builder will, if necessary,
37    /// yield a chunk of which the size is strictly less than or equal to `n` when appending records.
38    /// When it's `None`, the chunk builder will yield chunks only when `take` is called.
39    max_chunk_size: Option<usize>,
40
41    /// The initial capacity of `ops` and `ArrayBuilder`s.
42    initial_capacity: usize,
43
44    /// Number of currently pending rows.
45    size: usize,
46}
47
48impl Drop for StreamChunkBuilder {
49    fn drop(&mut self) {
50        // Possible to fail when async task gets cancelled.
51        if self.size != 0 {
52            tracing::warn!(
53                remaining = self.size,
54                "dropping non-empty stream chunk builder"
55            );
56        }
57    }
58}
59
60const MAX_INITIAL_CAPACITY: usize = 4096;
61const DEFAULT_INITIAL_CAPACITY: usize = 64;
62
63impl StreamChunkBuilder {
64    /// Create a new `StreamChunkBuilder` with a fixed max chunk size.
65    /// Note that in the case of ending with `Update`, the builder may yield a chunk with size
66    /// `max_chunk_size + 1`.
67    pub fn new(max_chunk_size: usize, data_types: Vec<DataType>) -> Self {
68        assert!(max_chunk_size > 0);
69
70        let initial_capacity = max_chunk_size.min(MAX_INITIAL_CAPACITY);
71
72        let ops = Vec::with_capacity(initial_capacity);
73        let column_builders = data_types
74            .iter()
75            .map(|datatype| datatype.create_array_builder(initial_capacity))
76            .collect();
77        let vis_builder = BitmapBuilder::with_capacity(initial_capacity);
78        Self {
79            ops,
80            column_builders,
81            data_types,
82            vis_builder,
83            max_chunk_size: Some(max_chunk_size),
84            initial_capacity,
85            size: 0,
86        }
87    }
88
89    /// Create a new `StreamChunkBuilder` with unlimited chunk size.
90    /// The builder will only yield chunks when `take` is called.
91    pub fn unlimited(data_types: Vec<DataType>, initial_capacity: Option<usize>) -> Self {
92        let initial_capacity = initial_capacity.unwrap_or(DEFAULT_INITIAL_CAPACITY);
93        Self {
94            ops: Vec::with_capacity(initial_capacity),
95            column_builders: data_types
96                .iter()
97                .map(|datatype| datatype.create_array_builder(initial_capacity))
98                .collect(),
99            data_types,
100            vis_builder: BitmapBuilder::default(),
101            max_chunk_size: None,
102            initial_capacity,
103            size: 0,
104        }
105    }
106
107    pub fn build_empty(data_types: Vec<DataType>) -> StreamChunk {
108        Self::new(1, data_types).take_inner()
109    }
110
111    /// Get the current number of rows in the builder.
112    pub fn size(&self) -> usize {
113        self.size
114    }
115
116    /// Append an iterator of output index and datum to the builder, return a chunk if the builder
117    /// is full.
118    ///
119    /// Note: the caller must ensure that each column occurs exactly once in `iter`.
120    #[must_use]
121    pub fn append_iter<'a>(
122        &mut self,
123        op: Op,
124        iter: impl IntoIterator<Item = (usize, DatumRef<'a>)>,
125    ) -> Option<StreamChunk> {
126        self.append_iter_inner::<true>(op, iter)
127    }
128
129    /// Append a row to the builder, return a chunk if the builder is full.
130    #[must_use]
131    pub fn append_row(&mut self, op: Op, row: impl Row) -> Option<StreamChunk> {
132        self.append_iter_inner::<true>(op, row.iter().enumerate())
133    }
134
135    /// Append an invisible row to the builder, return a chunk if the builder is full.
136    #[must_use]
137    pub fn append_row_invisible(&mut self, op: Op, row: impl Row) -> Option<StreamChunk> {
138        self.append_iter_inner::<false>(op, row.iter().enumerate())
139    }
140
141    /// Append a record to the builder, return a chunk if the builder is full.
142    #[must_use]
143    pub fn append_record(&mut self, record: Record<impl Row>) -> Option<StreamChunk> {
144        match record {
145            Record::Insert { new_row } => self.append_row(Op::Insert, new_row),
146            Record::Delete { old_row } => self.append_row(Op::Delete, old_row),
147            Record::Update { old_row, new_row } => {
148                let none = self.append_row(Op::UpdateDelete, old_row);
149                assert!(none.is_none());
150                self.append_row(Op::UpdateInsert, new_row)
151            }
152        }
153    }
154
155    /// Take all the pending data and return a chunk. If there is no pending data, return `None`.
156    /// Note that if this is an unlimited chunk builder, the only way to get a chunk is to call
157    /// `take`.
158    #[must_use]
159    pub fn take(&mut self) -> Option<StreamChunk> {
160        if self.size == 0 {
161            return None;
162        }
163        Some(self.take_inner())
164    }
165
166    fn take_inner(&mut self) -> StreamChunk {
167        self.size = 0;
168
169        let ops = std::mem::replace(&mut self.ops, Vec::with_capacity(self.initial_capacity));
170        let columns = self
171            .column_builders
172            .iter_mut()
173            .zip_eq_fast(&self.data_types)
174            .map(|(builder, datatype)| {
175                std::mem::replace(
176                    builder,
177                    datatype.create_array_builder(self.initial_capacity),
178                )
179                .finish()
180            })
181            .map(Into::into)
182            .collect::<Vec<_>>();
183        let vis = std::mem::take(&mut self.vis_builder).finish();
184
185        StreamChunk::with_visibility(ops, columns, vis)
186    }
187
188    #[must_use]
189    fn append_iter_inner<'a, const VIS: bool>(
190        &mut self,
191        op: Op,
192        iter: impl IntoIterator<Item = (usize, DatumRef<'a>)>,
193    ) -> Option<StreamChunk> {
194        self.ops.push(op);
195        for (i, datum) in iter {
196            self.column_builders[i].append(datum);
197        }
198        self.vis_builder.append(VIS);
199        self.size += 1;
200
201        if let Some(max_chunk_size) = self.max_chunk_size {
202            if self.size == max_chunk_size && !op.is_update_delete() || self.size > max_chunk_size {
203                // Two situations here:
204                // 1. `self.size == max_chunk_size && op == Op::UpdateDelete`
205                //    We should wait for next `UpdateInsert` to join the chunk.
206                // 2. `self.size > max_chunk_size`
207                //    Here we assert that `self.size == max_chunk_size + 1`. It's possible that
208                //    the `Op` after `UpdateDelete` is not `UpdateInsert`, if something inconsistent
209                //    happens, we should still take the existing data.
210                self.take()
211            } else {
212                None
213            }
214        } else {
215            // unlimited
216            None
217        }
218    }
219}
220
221#[cfg(test)]
222mod tests {
223    use super::*;
224    use crate::array::{Datum, StreamChunkTestExt};
225    use crate::row::OwnedRow;
226
227    #[test]
228    fn test_stream_chunk_builder() {
229        let row = OwnedRow::new(vec![Datum::None, Datum::None]);
230        let mut builder = StreamChunkBuilder::new(3, vec![DataType::Int32, DataType::Int32]);
231        let res = builder.append_row(Op::Delete, row.clone());
232        assert!(res.is_none());
233        let res = builder.append_row(Op::Insert, row.clone());
234        assert!(res.is_none());
235        let res = builder.append_row(Op::Insert, row.clone());
236        assert_eq!(
237            res,
238            Some(StreamChunk::from_pretty(
239                "  i i
240                 - . .
241                 + . .
242                 + . ."
243            ))
244        );
245        let res = builder.take();
246        assert!(res.is_none());
247
248        let res = builder.append_row_invisible(Op::Delete, row.clone());
249        assert!(res.is_none());
250        let res = builder.append_iter(Op::Delete, row.iter().enumerate());
251        assert!(res.is_none());
252        let res = builder.append_record(Record::Insert {
253            new_row: row.clone(),
254        });
255        assert_eq!(
256            res,
257            Some(StreamChunk::from_pretty(
258                "  i i
259                 - . . D
260                 - . .
261                 + . ."
262            ))
263        );
264
265        let res = builder.append_row(Op::UpdateDelete, row.clone());
266        assert!(res.is_none());
267        let res = builder.append_row(Op::UpdateInsert, row.clone());
268        assert!(res.is_none());
269        let res = builder.append_record(Record::Update {
270            old_row: row.clone(),
271            new_row: row,
272        });
273        assert_eq!(
274            res,
275            Some(StreamChunk::from_pretty(
276                "  i i
277                U- . .
278                U+ . .
279                U- . .
280                U+ . ."
281            ))
282        );
283        let res = builder.take();
284        assert!(res.is_none());
285    }
286
287    #[test]
288    fn test_stream_chunk_builder_with_max_size_1() {
289        let row = OwnedRow::new(vec![Datum::None, Datum::None]);
290        let mut builder = StreamChunkBuilder::new(1, vec![DataType::Int32, DataType::Int32]);
291
292        let res = builder.append_row(Op::Delete, row.clone());
293        assert_eq!(
294            res,
295            Some(StreamChunk::from_pretty(
296                "  i i
297                 - . ."
298            ))
299        );
300        let res = builder.append_row(Op::Insert, row.clone());
301        assert_eq!(
302            res,
303            Some(StreamChunk::from_pretty(
304                "  i i
305                 + . ."
306            ))
307        );
308
309        let res = builder.append_record(Record::Update {
310            old_row: row.clone(),
311            new_row: row.clone(),
312        });
313        assert_eq!(
314            res,
315            Some(StreamChunk::from_pretty(
316                "  i i
317                U- . .
318                U+ . ."
319            ))
320        );
321
322        let res = builder.append_row(Op::UpdateDelete, row.clone());
323        assert!(res.is_none());
324        let res = builder.append_row(Op::UpdateDelete, row); // note this is an inconsistency
325        assert_eq!(
326            res,
327            Some(StreamChunk::from_pretty(
328                "  i i
329                U- . .
330                U- . ."
331            ))
332        );
333    }
334
335    #[test]
336    fn test_unlimited_stream_chunk_builder() {
337        let row = OwnedRow::new(vec![Datum::None, Datum::None]);
338        let mut builder =
339            StreamChunkBuilder::unlimited(vec![DataType::Int32, DataType::Int32], None);
340
341        let res = builder.append_row(Op::Delete, row.clone());
342        assert!(res.is_none());
343        let res = builder.append_row(Op::Insert, row.clone());
344        assert!(res.is_none());
345        let res = builder.append_row(Op::UpdateDelete, row.clone());
346        assert!(res.is_none());
347        let res = builder.append_row(Op::UpdateInsert, row.clone());
348        assert!(res.is_none());
349
350        for _ in 0..2048 {
351            let res = builder.append_record(Record::Update {
352                old_row: row.clone(),
353                new_row: row.clone(),
354            });
355            assert!(res.is_none());
356        }
357
358        let res = builder.take();
359        assert_eq!(res.unwrap().capacity(), 2048 * 2 + 4);
360    }
361}