risingwave_common/array/
bytes_array.rs1use std::io::Write;
16use std::iter;
17use std::mem::{ManuallyDrop, size_of};
18
19use risingwave_common_estimate_size::EstimateSize;
20use risingwave_pb::common::Buffer;
21use risingwave_pb::common::buffer::CompressionType;
22use risingwave_pb::data::{ArrayType, PbArray};
23
24use super::{Array, ArrayBuilder, DataType};
25use crate::bitmap::{Bitmap, BitmapBuilder};
26use crate::util::iter_util::ZipEqDebug;
27
28#[derive(Debug, Clone, PartialEq, Eq, EstimateSize)]
30pub struct BytesArray {
31 offset: Box<[u32]>,
32 bitmap: Bitmap,
33 data: Box<[u8]>,
34}
35
36impl Array for BytesArray {
37 type Builder = BytesArrayBuilder;
38 type OwnedItem = Box<[u8]>;
39 type RefItem<'a> = &'a [u8];
40
41 unsafe fn raw_value_at_unchecked(&self, idx: usize) -> &[u8] {
42 unsafe {
43 let begin = *self.offset.get_unchecked(idx) as usize;
44 let end = *self.offset.get_unchecked(idx + 1) as usize;
45 self.data.get_unchecked(begin..end)
46 }
47 }
48
49 fn len(&self) -> usize {
50 self.offset.len() - 1
51 }
52
53 fn to_protobuf(&self) -> PbArray {
54 let offset_buffer = self
55 .offset
56 .iter()
57 .zip_eq_debug(self.null_bitmap().iter().chain(iter::once(true)))
62 .fold(
63 Vec::<u8>::with_capacity(self.data.len() * size_of::<usize>()),
64 |mut buffer, (offset, not_null)| {
65 if not_null {
68 let offset = *offset as u64;
69 buffer.extend_from_slice(&offset.to_be_bytes());
70 }
71 buffer
72 },
73 );
74
75 let data_buffer = self.data.clone();
76
77 let values = vec![
78 Buffer {
79 compression: CompressionType::None as i32,
80 body: offset_buffer,
81 },
82 Buffer {
83 compression: CompressionType::None as i32,
84 body: data_buffer.into(),
85 },
86 ];
87 let null_bitmap = self.null_bitmap().to_protobuf();
88 PbArray {
89 null_bitmap: Some(null_bitmap),
90 values,
91 array_type: ArrayType::Bytea as i32,
92 struct_array_data: None,
93 list_array_data: None,
94 }
95 }
96
97 fn null_bitmap(&self) -> &Bitmap {
98 &self.bitmap
99 }
100
101 fn into_null_bitmap(self) -> Bitmap {
102 self.bitmap
103 }
104
105 fn set_bitmap(&mut self, bitmap: Bitmap) {
106 self.bitmap = bitmap;
107 }
108
109 fn data_type(&self) -> DataType {
110 DataType::Bytea
111 }
112}
113
114impl<'a> FromIterator<Option<&'a [u8]>> for BytesArray {
115 fn from_iter<I: IntoIterator<Item = Option<&'a [u8]>>>(iter: I) -> Self {
116 let iter = iter.into_iter();
117 let mut builder = <Self as Array>::Builder::new(iter.size_hint().0);
118 for i in iter {
119 builder.append(i);
120 }
121 builder.finish()
122 }
123}
124
125impl<'a> FromIterator<&'a Option<&'a [u8]>> for BytesArray {
126 fn from_iter<I: IntoIterator<Item = &'a Option<&'a [u8]>>>(iter: I) -> Self {
127 iter.into_iter().cloned().collect()
128 }
129}
130
131impl<'a> FromIterator<&'a [u8]> for BytesArray {
132 fn from_iter<I: IntoIterator<Item = &'a [u8]>>(iter: I) -> Self {
133 iter.into_iter().map(Some).collect()
134 }
135}
136
137#[derive(Debug, Clone, EstimateSize)]
139pub struct BytesArrayBuilder {
140 offset: Vec<u32>,
141 bitmap: BitmapBuilder,
142 data: Vec<u8>,
143}
144
145impl ArrayBuilder for BytesArrayBuilder {
146 type ArrayType = BytesArray;
147
148 fn new(item_capacity: usize) -> Self {
154 let mut offset = Vec::with_capacity(item_capacity + 1);
155 offset.push(0);
156 Self {
157 offset,
158 data: Vec::with_capacity(0),
159 bitmap: BitmapBuilder::with_capacity(item_capacity),
160 }
161 }
162
163 fn with_type(item_capacity: usize, ty: DataType) -> Self {
164 assert_eq!(ty, DataType::Bytea);
165 Self::new(item_capacity)
166 }
167
168 fn append_n<'a>(&'a mut self, n: usize, value: Option<&'a [u8]>) {
169 match value {
170 Some(x) => {
171 self.bitmap.append_n(n, true);
172 self.data.reserve(x.len() * n);
173 self.offset.reserve(n);
174 assert!(self.data.capacity() <= u32::MAX as usize);
175 for _ in 0..n {
176 self.data.extend_from_slice(x);
177 self.offset.push(self.data.len() as u32);
178 }
179 }
180 None => {
181 self.bitmap.append_n(n, false);
182 self.offset.reserve(n);
183 for _ in 0..n {
184 self.offset.push(self.data.len() as u32);
185 }
186 }
187 }
188 }
189
190 fn append_array(&mut self, other: &BytesArray) {
191 for bit in other.bitmap.iter() {
192 self.bitmap.append(bit);
193 }
194 self.data.extend_from_slice(&other.data);
195 let start = *self.offset.last().unwrap();
196 for other_offset in &other.offset[1..] {
197 self.offset.push(*other_offset + start);
198 }
199 }
200
201 fn pop(&mut self) -> Option<()> {
202 if self.bitmap.pop().is_some() {
203 self.offset.pop().unwrap();
204 let end = self.offset.last().unwrap();
205 self.data.truncate(*end as usize);
206 Some(())
207 } else {
208 None
209 }
210 }
211
212 fn len(&self) -> usize {
213 self.bitmap.len()
214 }
215
216 fn finish(self) -> BytesArray {
217 BytesArray {
218 bitmap: self.bitmap.finish(),
219 data: self.data.into(),
220 offset: self.offset.into(),
221 }
222 }
223}
224
225impl BytesArrayBuilder {
226 pub fn writer(&mut self) -> BytesWriter<'_> {
227 BytesWriter { builder: self }
228 }
229
230 unsafe fn append_partial(&mut self, x: &[u8]) {
233 self.data.extend_from_slice(x);
234 }
235
236 fn finish_partial(&mut self) {
240 self.offset.push(self.data.len() as u32);
241 self.bitmap.append(true);
242 }
243
244 fn rollback_partial(&mut self) {
248 let &last_offset = self.offset.last().unwrap();
249 assert!(last_offset <= self.data.len() as u32);
250 self.data.truncate(last_offset as usize);
251 }
252}
253
254pub struct BytesWriter<'a> {
256 builder: &'a mut BytesArrayBuilder,
257}
258
259impl BytesWriter<'_> {
260 pub fn write_ref(&mut self, value: &[u8]) {
262 unsafe { self.builder.append_partial(value) }
264 }
265
266 pub fn finish(self) {
269 self.builder.finish_partial();
270 let _ = ManuallyDrop::new(self); }
272}
273
274impl Write for BytesWriter<'_> {
275 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
276 self.write_ref(buf);
277 Ok(buf.len())
278 }
279
280 fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
281 self.write_ref(buf);
282 Ok(())
283 }
284
285 fn flush(&mut self) -> std::io::Result<()> {
286 Ok(())
287 }
288}
289
290impl Drop for BytesWriter<'_> {
291 fn drop(&mut self) {
292 self.builder.rollback_partial();
294 }
295}