1use crate::array::stream_record::Record;
16use crate::array::{ArrayBuilderImpl, Op, StreamChunk};
17use crate::bitmap::BitmapBuilder;
18use crate::row::Row;
19use crate::types::{DataType, DatumRef};
20use crate::util::iter_util::ZipEqFast;
21
22pub struct StreamChunkBuilder {
24 ops: Vec<Op>,
26
27 column_builders: Vec<ArrayBuilderImpl>,
29
30 vis_builder: BitmapBuilder,
32
33 data_types: Vec<DataType>,
35
36 max_chunk_size: Option<usize>,
40
41 initial_capacity: usize,
43
44 size: usize,
46}
47
48impl Drop for StreamChunkBuilder {
49 fn drop(&mut self) {
50 if self.size != 0 {
52 tracing::warn!(
53 remaining = self.size,
54 "dropping non-empty stream chunk builder"
55 );
56 }
57 }
58}
59
60const MAX_INITIAL_CAPACITY: usize = 4096;
61const DEFAULT_INITIAL_CAPACITY: usize = 64;
62
63impl StreamChunkBuilder {
64 pub fn new(max_chunk_size: usize, data_types: Vec<DataType>) -> Self {
68 assert!(max_chunk_size > 0);
69
70 let initial_capacity = max_chunk_size.min(MAX_INITIAL_CAPACITY);
71
72 let ops = Vec::with_capacity(initial_capacity);
73 let column_builders = data_types
74 .iter()
75 .map(|datatype| datatype.create_array_builder(initial_capacity))
76 .collect();
77 let vis_builder = BitmapBuilder::with_capacity(initial_capacity);
78 Self {
79 ops,
80 column_builders,
81 data_types,
82 vis_builder,
83 max_chunk_size: Some(max_chunk_size),
84 initial_capacity,
85 size: 0,
86 }
87 }
88
89 pub fn unlimited(data_types: Vec<DataType>, initial_capacity: Option<usize>) -> Self {
92 let initial_capacity = initial_capacity.unwrap_or(DEFAULT_INITIAL_CAPACITY);
93 Self {
94 ops: Vec::with_capacity(initial_capacity),
95 column_builders: data_types
96 .iter()
97 .map(|datatype| datatype.create_array_builder(initial_capacity))
98 .collect(),
99 data_types,
100 vis_builder: BitmapBuilder::default(),
101 max_chunk_size: None,
102 initial_capacity,
103 size: 0,
104 }
105 }
106
107 pub fn build_empty(data_types: Vec<DataType>) -> StreamChunk {
108 Self::new(1, data_types).take_inner()
109 }
110
111 pub fn size(&self) -> usize {
113 self.size
114 }
115
116 #[must_use]
121 pub fn append_iter<'a>(
122 &mut self,
123 op: Op,
124 iter: impl IntoIterator<Item = (usize, DatumRef<'a>)>,
125 ) -> Option<StreamChunk> {
126 self.append_iter_inner::<true>(op, iter)
127 }
128
129 #[must_use]
131 pub fn append_row(&mut self, op: Op, row: impl Row) -> Option<StreamChunk> {
132 self.append_iter_inner::<true>(op, row.iter().enumerate())
133 }
134
135 #[must_use]
137 pub fn append_row_invisible(&mut self, op: Op, row: impl Row) -> Option<StreamChunk> {
138 self.append_iter_inner::<false>(op, row.iter().enumerate())
139 }
140
141 #[must_use]
143 pub fn append_record(&mut self, record: Record<impl Row>) -> Option<StreamChunk> {
144 match record {
145 Record::Insert { new_row } => {
146 self.append_iter_inner::<true>(Op::Insert, new_row.iter().enumerate())
147 }
148 Record::Delete { old_row } => {
149 self.append_iter_inner::<true>(Op::Delete, old_row.iter().enumerate())
150 }
151 Record::Update { old_row, new_row } => {
152 let none =
153 self.append_iter_inner::<true>(Op::UpdateDelete, old_row.iter().enumerate());
154 assert!(none.is_none());
155 self.append_iter_inner::<true>(Op::UpdateInsert, new_row.iter().enumerate())
156 }
157 }
158 }
159
160 #[must_use]
164 pub fn take(&mut self) -> Option<StreamChunk> {
165 if self.size == 0 {
166 return None;
167 }
168 Some(self.take_inner())
169 }
170
171 fn take_inner(&mut self) -> StreamChunk {
172 self.size = 0;
173
174 let ops = std::mem::replace(&mut self.ops, Vec::with_capacity(self.initial_capacity));
175 let columns = self
176 .column_builders
177 .iter_mut()
178 .zip_eq_fast(&self.data_types)
179 .map(|(builder, datatype)| {
180 std::mem::replace(
181 builder,
182 datatype.create_array_builder(self.initial_capacity),
183 )
184 .finish()
185 })
186 .map(Into::into)
187 .collect::<Vec<_>>();
188 let vis = std::mem::take(&mut self.vis_builder).finish();
189
190 StreamChunk::with_visibility(ops, columns, vis)
191 }
192
193 #[must_use]
194 fn append_iter_inner<'a, const VIS: bool>(
195 &mut self,
196 op: Op,
197 iter: impl IntoIterator<Item = (usize, DatumRef<'a>)>,
198 ) -> Option<StreamChunk> {
199 self.ops.push(op);
200 for (i, datum) in iter {
201 self.column_builders[i].append(datum);
202 }
203 self.vis_builder.append(VIS);
204 self.size += 1;
205
206 if let Some(max_chunk_size) = self.max_chunk_size {
207 if self.size == max_chunk_size && !op.is_update_delete() || self.size > max_chunk_size {
208 self.take()
216 } else {
217 None
218 }
219 } else {
220 None
222 }
223 }
224}
225
226#[cfg(test)]
227mod tests {
228 use super::*;
229 use crate::array::{Datum, StreamChunkTestExt};
230 use crate::row::OwnedRow;
231
232 #[test]
233 fn test_stream_chunk_builder() {
234 let row = OwnedRow::new(vec![Datum::None, Datum::None]);
235 let mut builder = StreamChunkBuilder::new(3, vec![DataType::Int32, DataType::Int32]);
236 let res = builder.append_row(Op::Delete, row.clone());
237 assert!(res.is_none());
238 let res = builder.append_row(Op::Insert, row.clone());
239 assert!(res.is_none());
240 let res = builder.append_row(Op::Insert, row.clone());
241 assert_eq!(
242 res,
243 Some(StreamChunk::from_pretty(
244 " i i
245 - . .
246 + . .
247 + . ."
248 ))
249 );
250 let res = builder.take();
251 assert!(res.is_none());
252
253 let res = builder.append_row_invisible(Op::Delete, row.clone());
254 assert!(res.is_none());
255 let res = builder.append_iter(Op::Delete, row.clone().iter().enumerate());
256 assert!(res.is_none());
257 let res = builder.append_record(Record::Insert {
258 new_row: row.clone(),
259 });
260 assert_eq!(
261 res,
262 Some(StreamChunk::from_pretty(
263 " i i
264 - . . D
265 - . .
266 + . ."
267 ))
268 );
269
270 let res = builder.append_row(Op::UpdateDelete, row.clone());
271 assert!(res.is_none());
272 let res = builder.append_row(Op::UpdateInsert, row.clone());
273 assert!(res.is_none());
274 let res = builder.append_record(Record::Update {
275 old_row: row.clone(),
276 new_row: row.clone(),
277 });
278 assert_eq!(
279 res,
280 Some(StreamChunk::from_pretty(
281 " i i
282 U- . .
283 U+ . .
284 U- . .
285 U+ . ."
286 ))
287 );
288 let res = builder.take();
289 assert!(res.is_none());
290 }
291
292 #[test]
293 fn test_stream_chunk_builder_with_max_size_1() {
294 let row = OwnedRow::new(vec![Datum::None, Datum::None]);
295 let mut builder = StreamChunkBuilder::new(1, vec![DataType::Int32, DataType::Int32]);
296
297 let res = builder.append_row(Op::Delete, row.clone());
298 assert_eq!(
299 res,
300 Some(StreamChunk::from_pretty(
301 " i i
302 - . ."
303 ))
304 );
305 let res = builder.append_row(Op::Insert, row.clone());
306 assert_eq!(
307 res,
308 Some(StreamChunk::from_pretty(
309 " i i
310 + . ."
311 ))
312 );
313
314 let res = builder.append_record(Record::Update {
315 old_row: row.clone(),
316 new_row: row.clone(),
317 });
318 assert_eq!(
319 res,
320 Some(StreamChunk::from_pretty(
321 " i i
322 U- . .
323 U+ . ."
324 ))
325 );
326
327 let res = builder.append_row(Op::UpdateDelete, row.clone());
328 assert!(res.is_none());
329 let res = builder.append_row(Op::UpdateDelete, row.clone()); assert_eq!(
331 res,
332 Some(StreamChunk::from_pretty(
333 " i i
334 U- . .
335 U- . ."
336 ))
337 );
338 }
339
340 #[test]
341 fn test_unlimited_stream_chunk_builder() {
342 let row = OwnedRow::new(vec![Datum::None, Datum::None]);
343 let mut builder =
344 StreamChunkBuilder::unlimited(vec![DataType::Int32, DataType::Int32], None);
345
346 let res = builder.append_row(Op::Delete, row.clone());
347 assert!(res.is_none());
348 let res = builder.append_row(Op::Insert, row.clone());
349 assert!(res.is_none());
350 let res = builder.append_row(Op::UpdateDelete, row.clone());
351 assert!(res.is_none());
352 let res = builder.append_row(Op::UpdateInsert, row.clone());
353 assert!(res.is_none());
354
355 for _ in 0..2048 {
356 let res = builder.append_record(Record::Update {
357 old_row: row.clone(),
358 new_row: row.clone(),
359 });
360 assert!(res.is_none());
361 }
362
363 let res = builder.take();
364 assert_eq!(res.unwrap().capacity(), 2048 * 2 + 4);
365 }
366}