1use crate::array::stream_record::Record;
16use crate::array::{ArrayBuilderImpl, Op, StreamChunk};
17use crate::bitmap::BitmapBuilder;
18use crate::row::Row;
19use crate::types::{DataType, DatumRef};
20use crate::util::iter_util::ZipEqFast;
21
22pub struct StreamChunkBuilder {
24 ops: Vec<Op>,
26
27 column_builders: Vec<ArrayBuilderImpl>,
29
30 vis_builder: BitmapBuilder,
32
33 data_types: Vec<DataType>,
35
36 max_chunk_size: Option<usize>,
40
41 initial_capacity: usize,
43
44 size: usize,
46}
47
48impl Drop for StreamChunkBuilder {
49 fn drop(&mut self) {
50 if self.size != 0 {
52 tracing::warn!(
53 remaining = self.size,
54 "dropping non-empty stream chunk builder"
55 );
56 }
57 }
58}
59
60const MAX_INITIAL_CAPACITY: usize = 4096;
61const DEFAULT_INITIAL_CAPACITY: usize = 64;
62
63impl StreamChunkBuilder {
64 pub fn new(max_chunk_size: usize, data_types: Vec<DataType>) -> Self {
68 assert!(max_chunk_size > 0);
69
70 let initial_capacity = max_chunk_size.min(MAX_INITIAL_CAPACITY);
71
72 let ops = Vec::with_capacity(initial_capacity);
73 let column_builders = data_types
74 .iter()
75 .map(|datatype| datatype.create_array_builder(initial_capacity))
76 .collect();
77 let vis_builder = BitmapBuilder::with_capacity(initial_capacity);
78 Self {
79 ops,
80 column_builders,
81 data_types,
82 vis_builder,
83 max_chunk_size: Some(max_chunk_size),
84 initial_capacity,
85 size: 0,
86 }
87 }
88
89 pub fn unlimited(data_types: Vec<DataType>, initial_capacity: Option<usize>) -> Self {
92 let initial_capacity = initial_capacity.unwrap_or(DEFAULT_INITIAL_CAPACITY);
93 Self {
94 ops: Vec::with_capacity(initial_capacity),
95 column_builders: data_types
96 .iter()
97 .map(|datatype| datatype.create_array_builder(initial_capacity))
98 .collect(),
99 data_types,
100 vis_builder: BitmapBuilder::default(),
101 max_chunk_size: None,
102 initial_capacity,
103 size: 0,
104 }
105 }
106
107 pub fn build_empty(data_types: Vec<DataType>) -> StreamChunk {
108 Self::new(1, data_types).take_inner()
109 }
110
111 pub fn size(&self) -> usize {
113 self.size
114 }
115
116 #[must_use]
121 pub fn append_iter<'a>(
122 &mut self,
123 op: Op,
124 iter: impl IntoIterator<Item = (usize, DatumRef<'a>)>,
125 ) -> Option<StreamChunk> {
126 self.append_iter_inner::<true>(op, iter)
127 }
128
129 #[must_use]
131 pub fn append_row(&mut self, op: Op, row: impl Row) -> Option<StreamChunk> {
132 self.append_iter_inner::<true>(op, row.iter().enumerate())
133 }
134
135 #[must_use]
137 pub fn append_row_invisible(&mut self, op: Op, row: impl Row) -> Option<StreamChunk> {
138 self.append_iter_inner::<false>(op, row.iter().enumerate())
139 }
140
141 #[must_use]
143 pub fn append_record(&mut self, record: Record<impl Row>) -> Option<StreamChunk> {
144 match record {
145 Record::Insert { new_row } => self.append_row(Op::Insert, new_row),
146 Record::Delete { old_row } => self.append_row(Op::Delete, old_row),
147 Record::Update { old_row, new_row } => {
148 let none = self.append_row(Op::UpdateDelete, old_row);
149 assert!(none.is_none());
150 self.append_row(Op::UpdateInsert, new_row)
151 }
152 }
153 }
154
155 #[must_use]
159 pub fn take(&mut self) -> Option<StreamChunk> {
160 if self.size == 0 {
161 return None;
162 }
163 Some(self.take_inner())
164 }
165
166 fn take_inner(&mut self) -> StreamChunk {
167 self.size = 0;
168
169 let ops = std::mem::replace(&mut self.ops, Vec::with_capacity(self.initial_capacity));
170 let columns = self
171 .column_builders
172 .iter_mut()
173 .zip_eq_fast(&self.data_types)
174 .map(|(builder, datatype)| {
175 std::mem::replace(
176 builder,
177 datatype.create_array_builder(self.initial_capacity),
178 )
179 .finish()
180 })
181 .map(Into::into)
182 .collect::<Vec<_>>();
183 let vis = std::mem::take(&mut self.vis_builder).finish();
184
185 StreamChunk::with_visibility(ops, columns, vis)
186 }
187
188 #[must_use]
189 fn append_iter_inner<'a, const VIS: bool>(
190 &mut self,
191 op: Op,
192 iter: impl IntoIterator<Item = (usize, DatumRef<'a>)>,
193 ) -> Option<StreamChunk> {
194 self.ops.push(op);
195 for (i, datum) in iter {
196 self.column_builders[i].append(datum);
197 }
198 self.vis_builder.append(VIS);
199 self.size += 1;
200
201 if let Some(max_chunk_size) = self.max_chunk_size {
202 if self.size == max_chunk_size && !op.is_update_delete() || self.size > max_chunk_size {
203 self.take()
211 } else {
212 None
213 }
214 } else {
215 None
217 }
218 }
219}
220
221#[cfg(test)]
222mod tests {
223 use super::*;
224 use crate::array::{Datum, StreamChunkTestExt};
225 use crate::row::OwnedRow;
226
227 #[test]
228 fn test_stream_chunk_builder() {
229 let row = OwnedRow::new(vec![Datum::None, Datum::None]);
230 let mut builder = StreamChunkBuilder::new(3, vec![DataType::Int32, DataType::Int32]);
231 let res = builder.append_row(Op::Delete, row.clone());
232 assert!(res.is_none());
233 let res = builder.append_row(Op::Insert, row.clone());
234 assert!(res.is_none());
235 let res = builder.append_row(Op::Insert, row.clone());
236 assert_eq!(
237 res,
238 Some(StreamChunk::from_pretty(
239 " i i
240 - . .
241 + . .
242 + . ."
243 ))
244 );
245 let res = builder.take();
246 assert!(res.is_none());
247
248 let res = builder.append_row_invisible(Op::Delete, row.clone());
249 assert!(res.is_none());
250 let res = builder.append_iter(Op::Delete, row.iter().enumerate());
251 assert!(res.is_none());
252 let res = builder.append_record(Record::Insert {
253 new_row: row.clone(),
254 });
255 assert_eq!(
256 res,
257 Some(StreamChunk::from_pretty(
258 " i i
259 - . . D
260 - . .
261 + . ."
262 ))
263 );
264
265 let res = builder.append_row(Op::UpdateDelete, row.clone());
266 assert!(res.is_none());
267 let res = builder.append_row(Op::UpdateInsert, row.clone());
268 assert!(res.is_none());
269 let res = builder.append_record(Record::Update {
270 old_row: row.clone(),
271 new_row: row,
272 });
273 assert_eq!(
274 res,
275 Some(StreamChunk::from_pretty(
276 " i i
277 U- . .
278 U+ . .
279 U- . .
280 U+ . ."
281 ))
282 );
283 let res = builder.take();
284 assert!(res.is_none());
285 }
286
287 #[test]
288 fn test_stream_chunk_builder_with_max_size_1() {
289 let row = OwnedRow::new(vec![Datum::None, Datum::None]);
290 let mut builder = StreamChunkBuilder::new(1, vec![DataType::Int32, DataType::Int32]);
291
292 let res = builder.append_row(Op::Delete, row.clone());
293 assert_eq!(
294 res,
295 Some(StreamChunk::from_pretty(
296 " i i
297 - . ."
298 ))
299 );
300 let res = builder.append_row(Op::Insert, row.clone());
301 assert_eq!(
302 res,
303 Some(StreamChunk::from_pretty(
304 " i i
305 + . ."
306 ))
307 );
308
309 let res = builder.append_record(Record::Update {
310 old_row: row.clone(),
311 new_row: row.clone(),
312 });
313 assert_eq!(
314 res,
315 Some(StreamChunk::from_pretty(
316 " i i
317 U- . .
318 U+ . ."
319 ))
320 );
321
322 let res = builder.append_row(Op::UpdateDelete, row.clone());
323 assert!(res.is_none());
324 let res = builder.append_row(Op::UpdateDelete, row); assert_eq!(
326 res,
327 Some(StreamChunk::from_pretty(
328 " i i
329 U- . .
330 U- . ."
331 ))
332 );
333 }
334
335 #[test]
336 fn test_unlimited_stream_chunk_builder() {
337 let row = OwnedRow::new(vec![Datum::None, Datum::None]);
338 let mut builder =
339 StreamChunkBuilder::unlimited(vec![DataType::Int32, DataType::Int32], None);
340
341 let res = builder.append_row(Op::Delete, row.clone());
342 assert!(res.is_none());
343 let res = builder.append_row(Op::Insert, row.clone());
344 assert!(res.is_none());
345 let res = builder.append_row(Op::UpdateDelete, row.clone());
346 assert!(res.is_none());
347 let res = builder.append_row(Op::UpdateInsert, row.clone());
348 assert!(res.is_none());
349
350 for _ in 0..2048 {
351 let res = builder.append_record(Record::Update {
352 old_row: row.clone(),
353 new_row: row.clone(),
354 });
355 assert!(res.is_none());
356 }
357
358 let res = builder.take();
359 assert_eq!(res.unwrap().capacity(), 2048 * 2 + 4);
360 }
361}