1use crate::array::stream_record::Record;
16use crate::array::{ArrayBuilderImpl, Op, StreamChunk};
17use crate::bitmap::BitmapBuilder;
18use crate::row::Row;
19use crate::types::{DataType, DatumRef};
20use crate::util::iter_util::ZipEqFast;
21
22pub struct StreamChunkBuilder {
24 ops: Vec<Op>,
26
27 column_builders: Vec<ArrayBuilderImpl>,
29
30 vis_builder: BitmapBuilder,
32
33 data_types: Vec<DataType>,
35
36 max_chunk_size: Option<usize>,
40
41 initial_capacity: usize,
43
44 size: usize,
46}
47
48impl Drop for StreamChunkBuilder {
49 fn drop(&mut self) {
50 if self.size != 0 {
52 tracing::warn!(
53 remaining = self.size,
54 "dropping non-empty stream chunk builder"
55 );
56 }
57 }
58}
59
60const MAX_INITIAL_CAPACITY: usize = 4096;
61const DEFAULT_INITIAL_CAPACITY: usize = 64;
62
63impl StreamChunkBuilder {
64 pub fn new(max_chunk_size: usize, data_types: Vec<DataType>) -> Self {
68 assert!(max_chunk_size > 0);
69
70 let initial_capacity = max_chunk_size.min(MAX_INITIAL_CAPACITY);
71
72 let ops = Vec::with_capacity(initial_capacity);
73 let column_builders = data_types
74 .iter()
75 .map(|datatype| datatype.create_array_builder(initial_capacity))
76 .collect();
77 let vis_builder = BitmapBuilder::with_capacity(initial_capacity);
78 Self {
79 ops,
80 column_builders,
81 data_types,
82 vis_builder,
83 max_chunk_size: Some(max_chunk_size),
84 initial_capacity,
85 size: 0,
86 }
87 }
88
89 pub fn unlimited(data_types: Vec<DataType>, initial_capacity: Option<usize>) -> Self {
92 let initial_capacity = initial_capacity.unwrap_or(DEFAULT_INITIAL_CAPACITY);
93 Self {
94 ops: Vec::with_capacity(initial_capacity),
95 column_builders: data_types
96 .iter()
97 .map(|datatype| datatype.create_array_builder(initial_capacity))
98 .collect(),
99 data_types,
100 vis_builder: BitmapBuilder::default(),
101 max_chunk_size: None,
102 initial_capacity,
103 size: 0,
104 }
105 }
106
107 pub fn size(&self) -> usize {
109 self.size
110 }
111
112 #[must_use]
117 pub fn append_iter<'a>(
118 &mut self,
119 op: Op,
120 iter: impl IntoIterator<Item = (usize, DatumRef<'a>)>,
121 ) -> Option<StreamChunk> {
122 self.append_iter_inner::<true>(op, iter)
123 }
124
125 #[must_use]
127 pub fn append_row(&mut self, op: Op, row: impl Row) -> Option<StreamChunk> {
128 self.append_iter_inner::<true>(op, row.iter().enumerate())
129 }
130
131 #[must_use]
133 pub fn append_row_invisible(&mut self, op: Op, row: impl Row) -> Option<StreamChunk> {
134 self.append_iter_inner::<false>(op, row.iter().enumerate())
135 }
136
137 #[must_use]
139 pub fn append_record(&mut self, record: Record<impl Row>) -> Option<StreamChunk> {
140 match record {
141 Record::Insert { new_row } => {
142 self.append_iter_inner::<true>(Op::Insert, new_row.iter().enumerate())
143 }
144 Record::Delete { old_row } => {
145 self.append_iter_inner::<true>(Op::Delete, old_row.iter().enumerate())
146 }
147 Record::Update { old_row, new_row } => {
148 let none =
149 self.append_iter_inner::<true>(Op::UpdateDelete, old_row.iter().enumerate());
150 assert!(none.is_none());
151 self.append_iter_inner::<true>(Op::UpdateInsert, new_row.iter().enumerate())
152 }
153 }
154 }
155
156 #[must_use]
160 pub fn take(&mut self) -> Option<StreamChunk> {
161 if self.size == 0 {
162 return None;
163 }
164 self.size = 0;
165
166 let ops = std::mem::replace(&mut self.ops, Vec::with_capacity(self.initial_capacity));
167 let columns = self
168 .column_builders
169 .iter_mut()
170 .zip_eq_fast(&self.data_types)
171 .map(|(builder, datatype)| {
172 std::mem::replace(
173 builder,
174 datatype.create_array_builder(self.initial_capacity),
175 )
176 .finish()
177 })
178 .map(Into::into)
179 .collect::<Vec<_>>();
180 let vis = std::mem::take(&mut self.vis_builder).finish();
181
182 Some(StreamChunk::with_visibility(ops, columns, vis))
183 }
184
185 #[must_use]
186 fn append_iter_inner<'a, const VIS: bool>(
187 &mut self,
188 op: Op,
189 iter: impl IntoIterator<Item = (usize, DatumRef<'a>)>,
190 ) -> Option<StreamChunk> {
191 self.ops.push(op);
192 for (i, datum) in iter {
193 self.column_builders[i].append(datum);
194 }
195 self.vis_builder.append(VIS);
196 self.size += 1;
197
198 if let Some(max_chunk_size) = self.max_chunk_size {
199 if self.size == max_chunk_size && !op.is_update_delete() || self.size > max_chunk_size {
200 self.take()
208 } else {
209 None
210 }
211 } else {
212 None
214 }
215 }
216}
217
218#[cfg(test)]
219mod tests {
220 use super::*;
221 use crate::array::{Datum, StreamChunkTestExt};
222 use crate::row::OwnedRow;
223
224 #[test]
225 fn test_stream_chunk_builder() {
226 let row = OwnedRow::new(vec![Datum::None, Datum::None]);
227 let mut builder = StreamChunkBuilder::new(3, vec![DataType::Int32, DataType::Int32]);
228 let res = builder.append_row(Op::Delete, row.clone());
229 assert!(res.is_none());
230 let res = builder.append_row(Op::Insert, row.clone());
231 assert!(res.is_none());
232 let res = builder.append_row(Op::Insert, row.clone());
233 assert_eq!(
234 res,
235 Some(StreamChunk::from_pretty(
236 " i i
237 - . .
238 + . .
239 + . ."
240 ))
241 );
242 let res = builder.take();
243 assert!(res.is_none());
244
245 let res = builder.append_row_invisible(Op::Delete, row.clone());
246 assert!(res.is_none());
247 let res = builder.append_iter(Op::Delete, row.clone().iter().enumerate());
248 assert!(res.is_none());
249 let res = builder.append_record(Record::Insert {
250 new_row: row.clone(),
251 });
252 assert_eq!(
253 res,
254 Some(StreamChunk::from_pretty(
255 " i i
256 - . . D
257 - . .
258 + . ."
259 ))
260 );
261
262 let res = builder.append_row(Op::UpdateDelete, row.clone());
263 assert!(res.is_none());
264 let res = builder.append_row(Op::UpdateInsert, row.clone());
265 assert!(res.is_none());
266 let res = builder.append_record(Record::Update {
267 old_row: row.clone(),
268 new_row: row.clone(),
269 });
270 assert_eq!(
271 res,
272 Some(StreamChunk::from_pretty(
273 " i i
274 U- . .
275 U+ . .
276 U- . .
277 U+ . ."
278 ))
279 );
280 let res = builder.take();
281 assert!(res.is_none());
282 }
283
284 #[test]
285 fn test_stream_chunk_builder_with_max_size_1() {
286 let row = OwnedRow::new(vec![Datum::None, Datum::None]);
287 let mut builder = StreamChunkBuilder::new(1, vec![DataType::Int32, DataType::Int32]);
288
289 let res = builder.append_row(Op::Delete, row.clone());
290 assert_eq!(
291 res,
292 Some(StreamChunk::from_pretty(
293 " i i
294 - . ."
295 ))
296 );
297 let res = builder.append_row(Op::Insert, row.clone());
298 assert_eq!(
299 res,
300 Some(StreamChunk::from_pretty(
301 " i i
302 + . ."
303 ))
304 );
305
306 let res = builder.append_record(Record::Update {
307 old_row: row.clone(),
308 new_row: row.clone(),
309 });
310 assert_eq!(
311 res,
312 Some(StreamChunk::from_pretty(
313 " i i
314 U- . .
315 U+ . ."
316 ))
317 );
318
319 let res = builder.append_row(Op::UpdateDelete, row.clone());
320 assert!(res.is_none());
321 let res = builder.append_row(Op::UpdateDelete, row.clone()); assert_eq!(
323 res,
324 Some(StreamChunk::from_pretty(
325 " i i
326 U- . .
327 U- . ."
328 ))
329 );
330 }
331
332 #[test]
333 fn test_unlimited_stream_chunk_builder() {
334 let row = OwnedRow::new(vec![Datum::None, Datum::None]);
335 let mut builder =
336 StreamChunkBuilder::unlimited(vec![DataType::Int32, DataType::Int32], None);
337
338 let res = builder.append_row(Op::Delete, row.clone());
339 assert!(res.is_none());
340 let res = builder.append_row(Op::Insert, row.clone());
341 assert!(res.is_none());
342 let res = builder.append_row(Op::UpdateDelete, row.clone());
343 assert!(res.is_none());
344 let res = builder.append_row(Op::UpdateInsert, row.clone());
345 assert!(res.is_none());
346
347 for _ in 0..2048 {
348 let res = builder.append_record(Record::Update {
349 old_row: row.clone(),
350 new_row: row.clone(),
351 });
352 assert!(res.is_none());
353 }
354
355 let res = builder.take();
356 assert_eq!(res.unwrap().capacity(), 2048 * 2 + 4);
357 }
358}