risingwave_common/array/
bytes_array.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::iter;
16use std::mem::size_of;
17
18use risingwave_common_estimate_size::EstimateSize;
19use risingwave_pb::common::Buffer;
20use risingwave_pb::common::buffer::CompressionType;
21use risingwave_pb::data::{ArrayType, PbArray};
22
23use super::{Array, ArrayBuilder, DataType};
24use crate::bitmap::{Bitmap, BitmapBuilder};
25use crate::util::iter_util::ZipEqDebug;
26
27/// `BytesArray` is a collection of Rust `[u8]`s.
28#[derive(Debug, Clone, PartialEq, Eq, EstimateSize)]
29pub struct BytesArray {
30    offset: Box<[u32]>,
31    bitmap: Bitmap,
32    data: Box<[u8]>,
33}
34
35impl Array for BytesArray {
36    type Builder = BytesArrayBuilder;
37    type OwnedItem = Box<[u8]>;
38    type RefItem<'a> = &'a [u8];
39
40    unsafe fn raw_value_at_unchecked(&self, idx: usize) -> &[u8] {
41        unsafe {
42            let begin = *self.offset.get_unchecked(idx) as usize;
43            let end = *self.offset.get_unchecked(idx + 1) as usize;
44            self.data.get_unchecked(begin..end)
45        }
46    }
47
48    fn len(&self) -> usize {
49        self.offset.len() - 1
50    }
51
52    fn to_protobuf(&self) -> PbArray {
53        let offset_buffer = self
54            .offset
55            .iter()
56            // length of offset is n + 1 while the length
57            // of null_bitmap is n, chain iterator of null_bitmapÆ’
58            // with one single true here to push the end of offset
59            // to offset_buffer
60            .zip_eq_debug(self.null_bitmap().iter().chain(iter::once(true)))
61            .fold(
62                Vec::<u8>::with_capacity(self.data.len() * size_of::<usize>()),
63                |mut buffer, (offset, not_null)| {
64                    // TODO: force convert usize to u64, frontend will treat this offset buffer as
65                    // u64
66                    if not_null {
67                        let offset = *offset as u64;
68                        buffer.extend_from_slice(&offset.to_be_bytes());
69                    }
70                    buffer
71                },
72            );
73
74        let data_buffer = self.data.clone();
75
76        let values = vec![
77            Buffer {
78                compression: CompressionType::None as i32,
79                body: offset_buffer,
80            },
81            Buffer {
82                compression: CompressionType::None as i32,
83                body: data_buffer.into(),
84            },
85        ];
86        let null_bitmap = self.null_bitmap().to_protobuf();
87        PbArray {
88            null_bitmap: Some(null_bitmap),
89            values,
90            array_type: ArrayType::Bytea as i32,
91            struct_array_data: None,
92            list_array_data: None,
93        }
94    }
95
96    fn null_bitmap(&self) -> &Bitmap {
97        &self.bitmap
98    }
99
100    fn into_null_bitmap(self) -> Bitmap {
101        self.bitmap
102    }
103
104    fn set_bitmap(&mut self, bitmap: Bitmap) {
105        self.bitmap = bitmap;
106    }
107
108    fn data_type(&self) -> DataType {
109        DataType::Bytea
110    }
111}
112
113impl<'a> FromIterator<Option<&'a [u8]>> for BytesArray {
114    fn from_iter<I: IntoIterator<Item = Option<&'a [u8]>>>(iter: I) -> Self {
115        let iter = iter.into_iter();
116        let mut builder = <Self as Array>::Builder::new(iter.size_hint().0);
117        for i in iter {
118            builder.append(i);
119        }
120        builder.finish()
121    }
122}
123
124impl<'a> FromIterator<&'a Option<&'a [u8]>> for BytesArray {
125    fn from_iter<I: IntoIterator<Item = &'a Option<&'a [u8]>>>(iter: I) -> Self {
126        iter.into_iter().cloned().collect()
127    }
128}
129
130impl<'a> FromIterator<&'a [u8]> for BytesArray {
131    fn from_iter<I: IntoIterator<Item = &'a [u8]>>(iter: I) -> Self {
132        iter.into_iter().map(Some).collect()
133    }
134}
135
136/// `BytesArrayBuilder` use `&[u8]` to build an `BytesArray`.
137#[derive(Debug, Clone, EstimateSize)]
138pub struct BytesArrayBuilder {
139    offset: Vec<u32>,
140    bitmap: BitmapBuilder,
141    data: Vec<u8>,
142}
143
144impl ArrayBuilder for BytesArrayBuilder {
145    type ArrayType = BytesArray;
146
147    /// Creates a new `BytesArrayBuilder`.
148    ///
149    /// `item_capacity` is the number of items to pre-allocate. The size of the preallocated
150    /// buffer of offsets is the number of items plus one.
151    /// No additional memory is pre-allocated for the data buffer.
152    fn new(item_capacity: usize) -> Self {
153        let mut offset = Vec::with_capacity(item_capacity + 1);
154        offset.push(0);
155        Self {
156            offset,
157            data: Vec::with_capacity(0),
158            bitmap: BitmapBuilder::with_capacity(item_capacity),
159        }
160    }
161
162    fn with_type(item_capacity: usize, ty: DataType) -> Self {
163        assert_eq!(ty, DataType::Bytea);
164        Self::new(item_capacity)
165    }
166
167    fn append_n<'a>(&'a mut self, n: usize, value: Option<&'a [u8]>) {
168        match value {
169            Some(x) => {
170                self.bitmap.append_n(n, true);
171                self.data.reserve(x.len() * n);
172                self.offset.reserve(n);
173                assert!(self.data.capacity() <= u32::MAX as usize);
174                for _ in 0..n {
175                    self.data.extend_from_slice(x);
176                    self.offset.push(self.data.len() as u32);
177                }
178            }
179            None => {
180                self.bitmap.append_n(n, false);
181                self.offset.reserve(n);
182                for _ in 0..n {
183                    self.offset.push(self.data.len() as u32);
184                }
185            }
186        }
187    }
188
189    fn append_array(&mut self, other: &BytesArray) {
190        for bit in other.bitmap.iter() {
191            self.bitmap.append(bit);
192        }
193        self.data.extend_from_slice(&other.data);
194        let start = *self.offset.last().unwrap();
195        for other_offset in &other.offset[1..] {
196            self.offset.push(*other_offset + start);
197        }
198    }
199
200    fn pop(&mut self) -> Option<()> {
201        if self.bitmap.pop().is_some() {
202            self.offset.pop().unwrap();
203            let end = self.offset.last().unwrap();
204            self.data.truncate(*end as usize);
205            Some(())
206        } else {
207            None
208        }
209    }
210
211    fn len(&self) -> usize {
212        self.bitmap.len()
213    }
214
215    fn finish(self) -> BytesArray {
216        BytesArray {
217            bitmap: self.bitmap.finish(),
218            data: self.data.into(),
219            offset: self.offset.into(),
220        }
221    }
222}
223
224impl BytesArrayBuilder {
225    pub fn writer(&mut self) -> BytesWriter<'_> {
226        BytesWriter { builder: self }
227    }
228
229    /// `append_partial` will add a partial dirty data of the new record.
230    /// The partial data will keep untracked until `finish_partial` was called.
231    unsafe fn append_partial(&mut self, x: &[u8]) {
232        self.data.extend_from_slice(x);
233    }
234
235    /// `finish_partial` will create a new record based on the current dirty data.
236    /// `finish_partial` was safe even if we don't call `append_partial`, which is equivalent to
237    /// appending an empty bytes.
238    fn finish_partial(&mut self) {
239        self.offset.push(self.data.len() as u32);
240        self.bitmap.append(true);
241    }
242
243    /// Rollback the partial-written data by [`Self::append_partial`].
244    ///
245    /// This is a safe method, if no `append_partial` was called, then the call has no effect.
246    fn rollback_partial(&mut self) {
247        let &last_offset = self.offset.last().unwrap();
248        assert!(last_offset <= self.data.len() as u32);
249        self.data.truncate(last_offset as usize);
250    }
251}
252
253pub struct BytesWriter<'a> {
254    builder: &'a mut BytesArrayBuilder,
255}
256
257impl<'a> BytesWriter<'a> {
258    /// `write_ref` will consume `BytesWriter` and pass the ownership of `builder` to `BytesGuard`.
259    pub fn write_ref(self, value: &[u8]) {
260        self.builder.append(Some(value));
261    }
262
263    /// `begin` will create a `PartialBytesWriter`, which allow multiple appendings to create a new
264    /// record.
265    pub fn begin(self) -> PartialBytesWriter<'a> {
266        PartialBytesWriter {
267            builder: self.builder,
268        }
269    }
270}
271
272pub struct PartialBytesWriter<'a> {
273    builder: &'a mut BytesArrayBuilder,
274}
275
276impl PartialBytesWriter<'_> {
277    /// `write_ref` will append partial dirty data to `builder`.
278    /// `PartialBytesWriter::write_ref` is different from `BytesWriter::write_ref`
279    /// in that it allows us to call it multiple times.
280    pub fn write_ref(&mut self, value: &[u8]) {
281        // SAFETY: We'll clean the dirty `builder` in the `drop`.
282        unsafe { self.builder.append_partial(value) }
283    }
284
285    /// `finish` will be called while the entire record is written.
286    /// Exactly one new record was appended and the `builder` can be safely used.
287    pub fn finish(self) {
288        self.builder.finish_partial();
289    }
290}
291
292impl Drop for PartialBytesWriter<'_> {
293    fn drop(&mut self) {
294        // If `finish` is not called, we should rollback the data.
295        self.builder.rollback_partial();
296    }
297}