risingwave_common/array/
stream_chunk_builder.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::array::stream_record::Record;
16use crate::array::{ArrayBuilderImpl, Op, StreamChunk};
17use crate::bitmap::BitmapBuilder;
18use crate::row::Row;
19use crate::types::{DataType, DatumRef};
20use crate::util::iter_util::ZipEqFast;
21
22/// Build stream chunks with fixed chunk size from rows or records.
23pub struct StreamChunkBuilder {
24    /// operations in the data chunk to build
25    ops: Vec<Op>,
26
27    /// arrays in the data chunk to build
28    column_builders: Vec<ArrayBuilderImpl>,
29
30    /// Visibility
31    vis_builder: BitmapBuilder,
32
33    /// Data types of columns
34    data_types: Vec<DataType>,
35
36    /// Max number of rows in a chunk. When it's `Some(n)`, the chunk builder will, if necessary,
37    /// yield a chunk of which the size is strictly less than or equal to `n` when appending records.
38    /// When it's `None`, the chunk builder will yield chunks only when `take` is called.
39    max_chunk_size: Option<usize>,
40
41    /// The initial capacity of `ops` and `ArrayBuilder`s.
42    initial_capacity: usize,
43
44    /// Number of currently pending rows.
45    size: usize,
46}
47
48impl Drop for StreamChunkBuilder {
49    fn drop(&mut self) {
50        // Possible to fail when async task gets cancelled.
51        if self.size != 0 {
52            tracing::warn!(
53                remaining = self.size,
54                "dropping non-empty stream chunk builder"
55            );
56        }
57    }
58}
59
60const MAX_INITIAL_CAPACITY: usize = 4096;
61const DEFAULT_INITIAL_CAPACITY: usize = 64;
62
63impl StreamChunkBuilder {
64    /// Create a new `StreamChunkBuilder` with a fixed max chunk size.
65    /// Note that in the case of ending with `Update`, the builder may yield a chunk with size
66    /// `max_chunk_size + 1`.
67    pub fn new(max_chunk_size: usize, data_types: Vec<DataType>) -> Self {
68        assert!(max_chunk_size > 0);
69
70        let initial_capacity = max_chunk_size.min(MAX_INITIAL_CAPACITY);
71
72        let ops = Vec::with_capacity(initial_capacity);
73        let column_builders = data_types
74            .iter()
75            .map(|datatype| datatype.create_array_builder(initial_capacity))
76            .collect();
77        let vis_builder = BitmapBuilder::with_capacity(initial_capacity);
78        Self {
79            ops,
80            column_builders,
81            data_types,
82            vis_builder,
83            max_chunk_size: Some(max_chunk_size),
84            initial_capacity,
85            size: 0,
86        }
87    }
88
89    /// Create a new `StreamChunkBuilder` with unlimited chunk size.
90    /// The builder will only yield chunks when `take` is called.
91    pub fn unlimited(data_types: Vec<DataType>, initial_capacity: Option<usize>) -> Self {
92        let initial_capacity = initial_capacity.unwrap_or(DEFAULT_INITIAL_CAPACITY);
93        Self {
94            ops: Vec::with_capacity(initial_capacity),
95            column_builders: data_types
96                .iter()
97                .map(|datatype| datatype.create_array_builder(initial_capacity))
98                .collect(),
99            data_types,
100            vis_builder: BitmapBuilder::default(),
101            max_chunk_size: None,
102            initial_capacity,
103            size: 0,
104        }
105    }
106
107    /// Get the current number of rows in the builder.
108    pub fn size(&self) -> usize {
109        self.size
110    }
111
112    /// Append an iterator of output index and datum to the builder, return a chunk if the builder
113    /// is full.
114    ///
115    /// Note: the caller must ensure that each column occurs exactly once in `iter`.
116    #[must_use]
117    pub fn append_iter<'a>(
118        &mut self,
119        op: Op,
120        iter: impl IntoIterator<Item = (usize, DatumRef<'a>)>,
121    ) -> Option<StreamChunk> {
122        self.append_iter_inner::<true>(op, iter)
123    }
124
125    /// Append a row to the builder, return a chunk if the builder is full.
126    #[must_use]
127    pub fn append_row(&mut self, op: Op, row: impl Row) -> Option<StreamChunk> {
128        self.append_iter_inner::<true>(op, row.iter().enumerate())
129    }
130
131    /// Append an invisible row to the builder, return a chunk if the builder is full.
132    #[must_use]
133    pub fn append_row_invisible(&mut self, op: Op, row: impl Row) -> Option<StreamChunk> {
134        self.append_iter_inner::<false>(op, row.iter().enumerate())
135    }
136
137    /// Append a record to the builder, return a chunk if the builder is full.
138    #[must_use]
139    pub fn append_record(&mut self, record: Record<impl Row>) -> Option<StreamChunk> {
140        match record {
141            Record::Insert { new_row } => {
142                self.append_iter_inner::<true>(Op::Insert, new_row.iter().enumerate())
143            }
144            Record::Delete { old_row } => {
145                self.append_iter_inner::<true>(Op::Delete, old_row.iter().enumerate())
146            }
147            Record::Update { old_row, new_row } => {
148                let none =
149                    self.append_iter_inner::<true>(Op::UpdateDelete, old_row.iter().enumerate());
150                assert!(none.is_none());
151                self.append_iter_inner::<true>(Op::UpdateInsert, new_row.iter().enumerate())
152            }
153        }
154    }
155
156    /// Take all the pending data and return a chunk. If there is no pending data, return `None`.
157    /// Note that if this is an unlimited chunk builder, the only way to get a chunk is to call
158    /// `take`.
159    #[must_use]
160    pub fn take(&mut self) -> Option<StreamChunk> {
161        if self.size == 0 {
162            return None;
163        }
164        self.size = 0;
165
166        let ops = std::mem::replace(&mut self.ops, Vec::with_capacity(self.initial_capacity));
167        let columns = self
168            .column_builders
169            .iter_mut()
170            .zip_eq_fast(&self.data_types)
171            .map(|(builder, datatype)| {
172                std::mem::replace(
173                    builder,
174                    datatype.create_array_builder(self.initial_capacity),
175                )
176                .finish()
177            })
178            .map(Into::into)
179            .collect::<Vec<_>>();
180        let vis = std::mem::take(&mut self.vis_builder).finish();
181
182        Some(StreamChunk::with_visibility(ops, columns, vis))
183    }
184
185    #[must_use]
186    fn append_iter_inner<'a, const VIS: bool>(
187        &mut self,
188        op: Op,
189        iter: impl IntoIterator<Item = (usize, DatumRef<'a>)>,
190    ) -> Option<StreamChunk> {
191        self.ops.push(op);
192        for (i, datum) in iter {
193            self.column_builders[i].append(datum);
194        }
195        self.vis_builder.append(VIS);
196        self.size += 1;
197
198        if let Some(max_chunk_size) = self.max_chunk_size {
199            if self.size == max_chunk_size && !op.is_update_delete() || self.size > max_chunk_size {
200                // Two situations here:
201                // 1. `self.size == max_chunk_size && op == Op::UpdateDelete`
202                //    We should wait for next `UpdateInsert` to join the chunk.
203                // 2. `self.size > max_chunk_size`
204                //    Here we assert that `self.size == max_chunk_size + 1`. It's possible that
205                //    the `Op` after `UpdateDelete` is not `UpdateInsert`, if something inconsistent
206                //    happens, we should still take the existing data.
207                self.take()
208            } else {
209                None
210            }
211        } else {
212            // unlimited
213            None
214        }
215    }
216}
217
218#[cfg(test)]
219mod tests {
220    use super::*;
221    use crate::array::{Datum, StreamChunkTestExt};
222    use crate::row::OwnedRow;
223
224    #[test]
225    fn test_stream_chunk_builder() {
226        let row = OwnedRow::new(vec![Datum::None, Datum::None]);
227        let mut builder = StreamChunkBuilder::new(3, vec![DataType::Int32, DataType::Int32]);
228        let res = builder.append_row(Op::Delete, row.clone());
229        assert!(res.is_none());
230        let res = builder.append_row(Op::Insert, row.clone());
231        assert!(res.is_none());
232        let res = builder.append_row(Op::Insert, row.clone());
233        assert_eq!(
234            res,
235            Some(StreamChunk::from_pretty(
236                "  i i
237                 - . .
238                 + . .
239                 + . ."
240            ))
241        );
242        let res = builder.take();
243        assert!(res.is_none());
244
245        let res = builder.append_row_invisible(Op::Delete, row.clone());
246        assert!(res.is_none());
247        let res = builder.append_iter(Op::Delete, row.clone().iter().enumerate());
248        assert!(res.is_none());
249        let res = builder.append_record(Record::Insert {
250            new_row: row.clone(),
251        });
252        assert_eq!(
253            res,
254            Some(StreamChunk::from_pretty(
255                "  i i
256                 - . . D
257                 - . .
258                 + . ."
259            ))
260        );
261
262        let res = builder.append_row(Op::UpdateDelete, row.clone());
263        assert!(res.is_none());
264        let res = builder.append_row(Op::UpdateInsert, row.clone());
265        assert!(res.is_none());
266        let res = builder.append_record(Record::Update {
267            old_row: row.clone(),
268            new_row: row.clone(),
269        });
270        assert_eq!(
271            res,
272            Some(StreamChunk::from_pretty(
273                "  i i
274                U- . .
275                U+ . .
276                U- . .
277                U+ . ."
278            ))
279        );
280        let res = builder.take();
281        assert!(res.is_none());
282    }
283
284    #[test]
285    fn test_stream_chunk_builder_with_max_size_1() {
286        let row = OwnedRow::new(vec![Datum::None, Datum::None]);
287        let mut builder = StreamChunkBuilder::new(1, vec![DataType::Int32, DataType::Int32]);
288
289        let res = builder.append_row(Op::Delete, row.clone());
290        assert_eq!(
291            res,
292            Some(StreamChunk::from_pretty(
293                "  i i
294                 - . ."
295            ))
296        );
297        let res = builder.append_row(Op::Insert, row.clone());
298        assert_eq!(
299            res,
300            Some(StreamChunk::from_pretty(
301                "  i i
302                 + . ."
303            ))
304        );
305
306        let res = builder.append_record(Record::Update {
307            old_row: row.clone(),
308            new_row: row.clone(),
309        });
310        assert_eq!(
311            res,
312            Some(StreamChunk::from_pretty(
313                "  i i
314                U- . .
315                U+ . ."
316            ))
317        );
318
319        let res = builder.append_row(Op::UpdateDelete, row.clone());
320        assert!(res.is_none());
321        let res = builder.append_row(Op::UpdateDelete, row.clone()); // note this is an inconsistency
322        assert_eq!(
323            res,
324            Some(StreamChunk::from_pretty(
325                "  i i
326                U- . .
327                U- . ."
328            ))
329        );
330    }
331
332    #[test]
333    fn test_unlimited_stream_chunk_builder() {
334        let row = OwnedRow::new(vec![Datum::None, Datum::None]);
335        let mut builder =
336            StreamChunkBuilder::unlimited(vec![DataType::Int32, DataType::Int32], None);
337
338        let res = builder.append_row(Op::Delete, row.clone());
339        assert!(res.is_none());
340        let res = builder.append_row(Op::Insert, row.clone());
341        assert!(res.is_none());
342        let res = builder.append_row(Op::UpdateDelete, row.clone());
343        assert!(res.is_none());
344        let res = builder.append_row(Op::UpdateInsert, row.clone());
345        assert!(res.is_none());
346
347        for _ in 0..2048 {
348            let res = builder.append_record(Record::Update {
349                old_row: row.clone(),
350                new_row: row.clone(),
351            });
352            assert!(res.is_none());
353        }
354
355        let res = builder.take();
356        assert_eq!(res.unwrap().capacity(), 2048 * 2 + 4);
357    }
358}