risingwave_common/array/
bytes_array.rs1use std::iter;
16use std::mem::size_of;
17
18use risingwave_common_estimate_size::EstimateSize;
19use risingwave_pb::common::Buffer;
20use risingwave_pb::common::buffer::CompressionType;
21use risingwave_pb::data::{ArrayType, PbArray};
22
23use super::{Array, ArrayBuilder, DataType};
24use crate::bitmap::{Bitmap, BitmapBuilder};
25use crate::util::iter_util::ZipEqDebug;
26
27#[derive(Debug, Clone, PartialEq, Eq, EstimateSize)]
29pub struct BytesArray {
30 offset: Box<[u32]>,
31 bitmap: Bitmap,
32 data: Box<[u8]>,
33}
34
35impl Array for BytesArray {
36 type Builder = BytesArrayBuilder;
37 type OwnedItem = Box<[u8]>;
38 type RefItem<'a> = &'a [u8];
39
40 unsafe fn raw_value_at_unchecked(&self, idx: usize) -> &[u8] {
41 unsafe {
42 let begin = *self.offset.get_unchecked(idx) as usize;
43 let end = *self.offset.get_unchecked(idx + 1) as usize;
44 self.data.get_unchecked(begin..end)
45 }
46 }
47
48 fn len(&self) -> usize {
49 self.offset.len() - 1
50 }
51
52 fn to_protobuf(&self) -> PbArray {
53 let offset_buffer = self
54 .offset
55 .iter()
56 .zip_eq_debug(self.null_bitmap().iter().chain(iter::once(true)))
61 .fold(
62 Vec::<u8>::with_capacity(self.data.len() * size_of::<usize>()),
63 |mut buffer, (offset, not_null)| {
64 if not_null {
67 let offset = *offset as u64;
68 buffer.extend_from_slice(&offset.to_be_bytes());
69 }
70 buffer
71 },
72 );
73
74 let data_buffer = self.data.clone();
75
76 let values = vec![
77 Buffer {
78 compression: CompressionType::None as i32,
79 body: offset_buffer,
80 },
81 Buffer {
82 compression: CompressionType::None as i32,
83 body: data_buffer.into(),
84 },
85 ];
86 let null_bitmap = self.null_bitmap().to_protobuf();
87 PbArray {
88 null_bitmap: Some(null_bitmap),
89 values,
90 array_type: ArrayType::Bytea as i32,
91 struct_array_data: None,
92 list_array_data: None,
93 }
94 }
95
96 fn null_bitmap(&self) -> &Bitmap {
97 &self.bitmap
98 }
99
100 fn into_null_bitmap(self) -> Bitmap {
101 self.bitmap
102 }
103
104 fn set_bitmap(&mut self, bitmap: Bitmap) {
105 self.bitmap = bitmap;
106 }
107
108 fn data_type(&self) -> DataType {
109 DataType::Bytea
110 }
111}
112
113impl<'a> FromIterator<Option<&'a [u8]>> for BytesArray {
114 fn from_iter<I: IntoIterator<Item = Option<&'a [u8]>>>(iter: I) -> Self {
115 let iter = iter.into_iter();
116 let mut builder = <Self as Array>::Builder::new(iter.size_hint().0);
117 for i in iter {
118 builder.append(i);
119 }
120 builder.finish()
121 }
122}
123
124impl<'a> FromIterator<&'a Option<&'a [u8]>> for BytesArray {
125 fn from_iter<I: IntoIterator<Item = &'a Option<&'a [u8]>>>(iter: I) -> Self {
126 iter.into_iter().cloned().collect()
127 }
128}
129
130impl<'a> FromIterator<&'a [u8]> for BytesArray {
131 fn from_iter<I: IntoIterator<Item = &'a [u8]>>(iter: I) -> Self {
132 iter.into_iter().map(Some).collect()
133 }
134}
135
136#[derive(Debug, Clone, EstimateSize)]
138pub struct BytesArrayBuilder {
139 offset: Vec<u32>,
140 bitmap: BitmapBuilder,
141 data: Vec<u8>,
142}
143
144impl ArrayBuilder for BytesArrayBuilder {
145 type ArrayType = BytesArray;
146
147 fn new(item_capacity: usize) -> Self {
153 let mut offset = Vec::with_capacity(item_capacity + 1);
154 offset.push(0);
155 Self {
156 offset,
157 data: Vec::with_capacity(0),
158 bitmap: BitmapBuilder::with_capacity(item_capacity),
159 }
160 }
161
162 fn with_type(item_capacity: usize, ty: DataType) -> Self {
163 assert_eq!(ty, DataType::Bytea);
164 Self::new(item_capacity)
165 }
166
167 fn append_n<'a>(&'a mut self, n: usize, value: Option<&'a [u8]>) {
168 match value {
169 Some(x) => {
170 self.bitmap.append_n(n, true);
171 self.data.reserve(x.len() * n);
172 self.offset.reserve(n);
173 assert!(self.data.capacity() <= u32::MAX as usize);
174 for _ in 0..n {
175 self.data.extend_from_slice(x);
176 self.offset.push(self.data.len() as u32);
177 }
178 }
179 None => {
180 self.bitmap.append_n(n, false);
181 self.offset.reserve(n);
182 for _ in 0..n {
183 self.offset.push(self.data.len() as u32);
184 }
185 }
186 }
187 }
188
189 fn append_array(&mut self, other: &BytesArray) {
190 for bit in other.bitmap.iter() {
191 self.bitmap.append(bit);
192 }
193 self.data.extend_from_slice(&other.data);
194 let start = *self.offset.last().unwrap();
195 for other_offset in &other.offset[1..] {
196 self.offset.push(*other_offset + start);
197 }
198 }
199
200 fn pop(&mut self) -> Option<()> {
201 if self.bitmap.pop().is_some() {
202 self.offset.pop().unwrap();
203 let end = self.offset.last().unwrap();
204 self.data.truncate(*end as usize);
205 Some(())
206 } else {
207 None
208 }
209 }
210
211 fn len(&self) -> usize {
212 self.bitmap.len()
213 }
214
215 fn finish(self) -> BytesArray {
216 BytesArray {
217 bitmap: self.bitmap.finish(),
218 data: self.data.into(),
219 offset: self.offset.into(),
220 }
221 }
222}
223
224impl BytesArrayBuilder {
225 pub fn writer(&mut self) -> BytesWriter<'_> {
226 BytesWriter { builder: self }
227 }
228
229 unsafe fn append_partial(&mut self, x: &[u8]) {
232 self.data.extend_from_slice(x);
233 }
234
235 fn finish_partial(&mut self) {
239 self.offset.push(self.data.len() as u32);
240 self.bitmap.append(true);
241 }
242
243 fn rollback_partial(&mut self) {
247 let &last_offset = self.offset.last().unwrap();
248 assert!(last_offset <= self.data.len() as u32);
249 self.data.truncate(last_offset as usize);
250 }
251}
252
253pub struct BytesWriter<'a> {
254 builder: &'a mut BytesArrayBuilder,
255}
256
257impl<'a> BytesWriter<'a> {
258 pub fn write_ref(self, value: &[u8]) {
260 self.builder.append(Some(value));
261 }
262
263 pub fn begin(self) -> PartialBytesWriter<'a> {
266 PartialBytesWriter {
267 builder: self.builder,
268 }
269 }
270}
271
272pub struct PartialBytesWriter<'a> {
273 builder: &'a mut BytesArrayBuilder,
274}
275
276impl PartialBytesWriter<'_> {
277 pub fn write_ref(&mut self, value: &[u8]) {
281 unsafe { self.builder.append_partial(value) }
283 }
284
285 pub fn finish(self) {
288 self.builder.finish_partial();
289 }
290}
291
292impl Drop for PartialBytesWriter<'_> {
293 fn drop(&mut self) {
294 self.builder.rollback_partial();
296 }
297}