risingwave_common/array/
bytes_array.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::io::Write;
16use std::iter;
17use std::mem::{ManuallyDrop, size_of};
18
19use risingwave_common_estimate_size::EstimateSize;
20use risingwave_pb::common::Buffer;
21use risingwave_pb::common::buffer::CompressionType;
22use risingwave_pb::data::{ArrayType, PbArray};
23
24use super::{Array, ArrayBuilder, DataType};
25use crate::bitmap::{Bitmap, BitmapBuilder};
26use crate::util::iter_util::ZipEqDebug;
27
28/// `BytesArray` is a collection of Rust `[u8]`s.
29#[derive(Debug, Clone, PartialEq, Eq, EstimateSize)]
30pub struct BytesArray {
31    offset: Box<[u32]>,
32    bitmap: Bitmap,
33    data: Box<[u8]>,
34}
35
36impl Array for BytesArray {
37    type Builder = BytesArrayBuilder;
38    type OwnedItem = Box<[u8]>;
39    type RefItem<'a> = &'a [u8];
40
41    unsafe fn raw_value_at_unchecked(&self, idx: usize) -> &[u8] {
42        unsafe {
43            let begin = *self.offset.get_unchecked(idx) as usize;
44            let end = *self.offset.get_unchecked(idx + 1) as usize;
45            self.data.get_unchecked(begin..end)
46        }
47    }
48
49    fn len(&self) -> usize {
50        self.offset.len() - 1
51    }
52
53    fn to_protobuf(&self) -> PbArray {
54        let offset_buffer = self
55            .offset
56            .iter()
57            // length of offset is n + 1 while the length
58            // of null_bitmap is n, chain iterator of null_bitmapÆ’
59            // with one single true here to push the end of offset
60            // to offset_buffer
61            .zip_eq_debug(self.null_bitmap().iter().chain(iter::once(true)))
62            .fold(
63                Vec::<u8>::with_capacity(self.data.len() * size_of::<usize>()),
64                |mut buffer, (offset, not_null)| {
65                    // TODO: force convert usize to u64, frontend will treat this offset buffer as
66                    // u64
67                    if not_null {
68                        let offset = *offset as u64;
69                        buffer.extend_from_slice(&offset.to_be_bytes());
70                    }
71                    buffer
72                },
73            );
74
75        let data_buffer = self.data.clone();
76
77        let values = vec![
78            Buffer {
79                compression: CompressionType::None as i32,
80                body: offset_buffer,
81            },
82            Buffer {
83                compression: CompressionType::None as i32,
84                body: data_buffer.into(),
85            },
86        ];
87        let null_bitmap = self.null_bitmap().to_protobuf();
88        PbArray {
89            null_bitmap: Some(null_bitmap),
90            values,
91            array_type: ArrayType::Bytea as i32,
92            struct_array_data: None,
93            list_array_data: None,
94        }
95    }
96
97    fn null_bitmap(&self) -> &Bitmap {
98        &self.bitmap
99    }
100
101    fn into_null_bitmap(self) -> Bitmap {
102        self.bitmap
103    }
104
105    fn set_bitmap(&mut self, bitmap: Bitmap) {
106        self.bitmap = bitmap;
107    }
108
109    fn data_type(&self) -> DataType {
110        DataType::Bytea
111    }
112}
113
114impl<'a> FromIterator<Option<&'a [u8]>> for BytesArray {
115    fn from_iter<I: IntoIterator<Item = Option<&'a [u8]>>>(iter: I) -> Self {
116        let iter = iter.into_iter();
117        let mut builder = <Self as Array>::Builder::new(iter.size_hint().0);
118        for i in iter {
119            builder.append(i);
120        }
121        builder.finish()
122    }
123}
124
125impl<'a> FromIterator<&'a Option<&'a [u8]>> for BytesArray {
126    fn from_iter<I: IntoIterator<Item = &'a Option<&'a [u8]>>>(iter: I) -> Self {
127        iter.into_iter().cloned().collect()
128    }
129}
130
131impl<'a> FromIterator<&'a [u8]> for BytesArray {
132    fn from_iter<I: IntoIterator<Item = &'a [u8]>>(iter: I) -> Self {
133        iter.into_iter().map(Some).collect()
134    }
135}
136
137/// `BytesArrayBuilder` use `&[u8]` to build an `BytesArray`.
138#[derive(Debug, Clone, EstimateSize)]
139pub struct BytesArrayBuilder {
140    offset: Vec<u32>,
141    bitmap: BitmapBuilder,
142    data: Vec<u8>,
143}
144
145impl ArrayBuilder for BytesArrayBuilder {
146    type ArrayType = BytesArray;
147
148    /// Creates a new `BytesArrayBuilder`.
149    ///
150    /// `item_capacity` is the number of items to pre-allocate. The size of the preallocated
151    /// buffer of offsets is the number of items plus one.
152    /// No additional memory is pre-allocated for the data buffer.
153    fn new(item_capacity: usize) -> Self {
154        let mut offset = Vec::with_capacity(item_capacity + 1);
155        offset.push(0);
156        Self {
157            offset,
158            data: Vec::with_capacity(0),
159            bitmap: BitmapBuilder::with_capacity(item_capacity),
160        }
161    }
162
163    fn with_type(item_capacity: usize, ty: DataType) -> Self {
164        assert_eq!(ty, DataType::Bytea);
165        Self::new(item_capacity)
166    }
167
168    fn append_n<'a>(&'a mut self, n: usize, value: Option<&'a [u8]>) {
169        match value {
170            Some(x) => {
171                self.bitmap.append_n(n, true);
172                self.data.reserve(x.len() * n);
173                self.offset.reserve(n);
174                assert!(self.data.capacity() <= u32::MAX as usize);
175                for _ in 0..n {
176                    self.data.extend_from_slice(x);
177                    self.offset.push(self.data.len() as u32);
178                }
179            }
180            None => {
181                self.bitmap.append_n(n, false);
182                self.offset.reserve(n);
183                for _ in 0..n {
184                    self.offset.push(self.data.len() as u32);
185                }
186            }
187        }
188    }
189
190    fn append_array(&mut self, other: &BytesArray) {
191        for bit in other.bitmap.iter() {
192            self.bitmap.append(bit);
193        }
194        self.data.extend_from_slice(&other.data);
195        let start = *self.offset.last().unwrap();
196        for other_offset in &other.offset[1..] {
197            self.offset.push(*other_offset + start);
198        }
199    }
200
201    fn pop(&mut self) -> Option<()> {
202        if self.bitmap.pop().is_some() {
203            self.offset.pop().unwrap();
204            let end = self.offset.last().unwrap();
205            self.data.truncate(*end as usize);
206            Some(())
207        } else {
208            None
209        }
210    }
211
212    fn len(&self) -> usize {
213        self.bitmap.len()
214    }
215
216    fn finish(self) -> BytesArray {
217        BytesArray {
218            bitmap: self.bitmap.finish(),
219            data: self.data.into(),
220            offset: self.offset.into(),
221        }
222    }
223}
224
225impl BytesArrayBuilder {
226    pub fn writer(&mut self) -> BytesWriter<'_> {
227        BytesWriter { builder: self }
228    }
229
230    /// `append_partial` will add a partial dirty data of the new record.
231    /// The partial data will keep untracked until `finish_partial` was called.
232    unsafe fn append_partial(&mut self, x: &[u8]) {
233        self.data.extend_from_slice(x);
234    }
235
236    /// `finish_partial` will create a new record based on the current dirty data.
237    /// `finish_partial` was safe even if we don't call `append_partial`, which is equivalent to
238    /// appending an empty bytes.
239    fn finish_partial(&mut self) {
240        self.offset.push(self.data.len() as u32);
241        self.bitmap.append(true);
242    }
243
244    /// Rollback the partial-written data by [`Self::append_partial`].
245    ///
246    /// This is a safe method, if no `append_partial` was called, then the call has no effect.
247    fn rollback_partial(&mut self) {
248        let &last_offset = self.offset.last().unwrap();
249        assert!(last_offset <= self.data.len() as u32);
250        self.data.truncate(last_offset as usize);
251    }
252}
253
254/// Note: dropping an unfinished `BytesWriter` will rollback the partial data
255pub struct BytesWriter<'a> {
256    builder: &'a mut BytesArrayBuilder,
257}
258
259impl BytesWriter<'_> {
260    /// `write_ref` will append partial dirty data to `builder`.
261    pub fn write_ref(&mut self, value: &[u8]) {
262        // SAFETY: We'll clean the dirty `builder` in the `drop`.
263        unsafe { self.builder.append_partial(value) }
264    }
265
266    /// `finish` will be called while the entire record is written.
267    /// Exactly one new record was appended and the `builder` can be safely used.
268    pub fn finish(self) {
269        self.builder.finish_partial();
270        let _ = ManuallyDrop::new(self); // Prevent drop
271    }
272}
273
274impl Write for BytesWriter<'_> {
275    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
276        self.write_ref(buf);
277        Ok(buf.len())
278    }
279
280    fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
281        self.write_ref(buf);
282        Ok(())
283    }
284
285    fn flush(&mut self) -> std::io::Result<()> {
286        Ok(())
287    }
288}
289
290impl Drop for BytesWriter<'_> {
291    fn drop(&mut self) {
292        // If `finish` is not called, we should rollback the data.
293        self.builder.rollback_partial();
294    }
295}