risingwave_common/array/
utf8_array.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{Display, Write};
16
17use risingwave_common_estimate_size::EstimateSize;
18use risingwave_pb::data::{ArrayType, PbArray};
19
20use super::bytes_array::{BytesWriter, PartialBytesWriter};
21use super::{Array, ArrayBuilder, BytesArray, BytesArrayBuilder, DataType};
22use crate::bitmap::Bitmap;
23
24/// `Utf8Array` is a collection of Rust Utf8 `str`s. It's a wrapper of `BytesArray`.
25#[derive(Debug, Clone, PartialEq, Eq)]
26pub struct Utf8Array {
27    bytes: BytesArray,
28}
29
30impl EstimateSize for Utf8Array {
31    fn estimated_heap_size(&self) -> usize {
32        self.bytes.estimated_heap_size()
33    }
34}
35
36impl Array for Utf8Array {
37    type Builder = Utf8ArrayBuilder;
38    type OwnedItem = Box<str>;
39    type RefItem<'a> = &'a str;
40
41    unsafe fn raw_value_at_unchecked(&self, idx: usize) -> Self::RefItem<'_> {
42        unsafe {
43            let bytes = self.bytes.raw_value_at_unchecked(idx);
44            std::str::from_utf8_unchecked(bytes)
45        }
46    }
47
48    #[inline]
49    fn len(&self) -> usize {
50        self.bytes.len()
51    }
52
53    #[inline]
54    fn to_protobuf(&self) -> PbArray {
55        PbArray {
56            array_type: ArrayType::Utf8 as i32,
57            ..self.bytes.to_protobuf()
58        }
59    }
60
61    fn null_bitmap(&self) -> &Bitmap {
62        self.bytes.null_bitmap()
63    }
64
65    fn into_null_bitmap(self) -> Bitmap {
66        self.bytes.into_null_bitmap()
67    }
68
69    fn set_bitmap(&mut self, bitmap: Bitmap) {
70        self.bytes.set_bitmap(bitmap);
71    }
72
73    fn data_type(&self) -> DataType {
74        DataType::Varchar
75    }
76}
77
78impl<'a> FromIterator<Option<&'a str>> for Utf8Array {
79    fn from_iter<I: IntoIterator<Item = Option<&'a str>>>(iter: I) -> Self {
80        Self {
81            bytes: iter.into_iter().map(|s| s.map(|s| s.as_bytes())).collect(),
82        }
83    }
84}
85
86impl<'a> FromIterator<&'a Option<&'a str>> for Utf8Array {
87    fn from_iter<I: IntoIterator<Item = &'a Option<&'a str>>>(iter: I) -> Self {
88        iter.into_iter().cloned().collect()
89    }
90}
91
92impl<'a> FromIterator<&'a str> for Utf8Array {
93    fn from_iter<I: IntoIterator<Item = &'a str>>(iter: I) -> Self {
94        iter.into_iter().map(Some).collect()
95    }
96}
97
98impl Utf8Array {
99    pub fn into_bytes_array(self) -> BytesArray {
100        self.bytes
101    }
102
103    pub fn from_iter_display(iter: impl IntoIterator<Item = Option<impl Display>>) -> Self {
104        let iter = iter.into_iter();
105        let mut builder = Utf8ArrayBuilder::new(iter.size_hint().0);
106        for e in iter {
107            if let Some(s) = e {
108                let mut writer = builder.writer().begin();
109                write!(writer, "{}", s).unwrap();
110                writer.finish();
111            } else {
112                builder.append_null();
113            }
114        }
115        builder.finish()
116    }
117}
118
119/// `Utf8ArrayBuilder` use `&str` to build an `Utf8Array`.
120#[derive(Debug, Clone, EstimateSize)]
121pub struct Utf8ArrayBuilder {
122    bytes: BytesArrayBuilder,
123}
124
125impl ArrayBuilder for Utf8ArrayBuilder {
126    type ArrayType = Utf8Array;
127
128    /// Creates a new `Utf8ArrayBuilder`.
129    ///
130    /// `item_capacity` is the number of items to pre-allocate. The size of the preallocated
131    /// buffer of offsets is the number of items plus one.
132    /// No additional memory is pre-allocated for the data buffer.
133    fn new(item_capacity: usize) -> Self {
134        Self {
135            bytes: BytesArrayBuilder::new(item_capacity),
136        }
137    }
138
139    fn with_type(item_capacity: usize, ty: DataType) -> Self {
140        assert_eq!(ty, DataType::Varchar);
141        Self::new(item_capacity)
142    }
143
144    #[inline]
145    fn append_n<'a>(&'a mut self, n: usize, value: Option<&'a str>) {
146        self.bytes.append_n(n, value.map(|v| v.as_bytes()));
147    }
148
149    #[inline]
150    fn append_array(&mut self, other: &Utf8Array) {
151        self.bytes.append_array(&other.bytes);
152    }
153
154    #[inline]
155    fn pop(&mut self) -> Option<()> {
156        self.bytes.pop()
157    }
158
159    fn len(&self) -> usize {
160        self.bytes.len()
161    }
162
163    fn finish(self) -> Utf8Array {
164        Utf8Array {
165            bytes: self.bytes.finish(),
166        }
167    }
168}
169
170impl Utf8ArrayBuilder {
171    pub fn writer(&mut self) -> StringWriter<'_> {
172        StringWriter {
173            bytes: self.bytes.writer(),
174        }
175    }
176
177    /// Append an element as the `Display` format to the array.
178    pub fn append_display(&mut self, value: Option<impl Display>) {
179        if let Some(s) = value {
180            let mut writer = self.writer().begin();
181            write!(writer, "{}", s).unwrap();
182            writer.finish();
183        } else {
184            self.append_null();
185        }
186    }
187}
188
189pub struct StringWriter<'a> {
190    bytes: BytesWriter<'a>,
191}
192
193impl<'a> StringWriter<'a> {
194    /// `begin` will create a `PartialStringWriter`, which allow multiple appendings to create a new
195    /// record.
196    pub fn begin(self) -> PartialStringWriter<'a> {
197        PartialStringWriter {
198            bytes: self.bytes.begin(),
199        }
200    }
201}
202
203// Note: dropping an unfinished `PartialStringWriter` will rollback the partial data, which is the
204// behavior of the inner `PartialBytesWriter`.
205pub struct PartialStringWriter<'a> {
206    bytes: PartialBytesWriter<'a>,
207}
208
209impl PartialStringWriter<'_> {
210    /// `finish` will be called while the entire record is written.
211    /// Exactly one new record was appended and the `builder` can be safely used.
212    pub fn finish(self) {
213        self.bytes.finish()
214    }
215}
216
217impl Write for PartialStringWriter<'_> {
218    fn write_str(&mut self, s: &str) -> std::fmt::Result {
219        self.bytes.write_ref(s.as_bytes());
220        Ok(())
221    }
222}
223
224#[cfg(test)]
225mod tests {
226    use std::hash::Hash;
227
228    use itertools::Itertools;
229
230    use super::*;
231    use crate::array::NULL_VAL_FOR_HASH;
232    use crate::util::iter_util::ZipEqFast;
233
234    #[test]
235    fn test_utf8_builder() {
236        let mut builder = Utf8ArrayBuilder::new(0);
237        for i in 0..100 {
238            if i % 2 == 0 {
239                builder.append(Some(&format!("{}", i)));
240            } else {
241                builder.append(None);
242            }
243        }
244        builder.finish();
245    }
246
247    #[test]
248    fn test_utf8_partial_writer() {
249        let mut builder = Utf8ArrayBuilder::new(0);
250        {
251            let writer = builder.writer();
252            let mut partial_writer = writer.begin();
253            for _ in 0..2 {
254                partial_writer.write_str("ran").unwrap();
255            }
256            partial_writer.finish()
257        };
258        let array = builder.finish();
259        assert_eq!(array.len(), 1);
260        assert_eq!(array.value_at(0), Some("ranran"));
261        assert_eq!(unsafe { array.value_at_unchecked(0) }, Some("ranran"));
262    }
263
264    #[test]
265    fn test_utf8_partial_writer_failed() {
266        let mut builder = Utf8ArrayBuilder::new(0);
267        // Write a record.
268        {
269            let writer = builder.writer();
270            let mut partial_writer = writer.begin();
271            partial_writer.write_str("Dia").unwrap();
272            partial_writer.write_str("na").unwrap();
273            partial_writer.finish()
274        };
275
276        // Write a record failed.
277        {
278            let writer = builder.writer();
279            let mut partial_writer = writer.begin();
280            partial_writer.write_str("Ca").unwrap();
281            partial_writer.write_str("rol").unwrap();
282            // We don't finish here.
283        };
284
285        // Write a record.
286        {
287            let writer = builder.writer();
288            let mut partial_writer = writer.begin();
289            partial_writer.write_str("Ki").unwrap();
290            partial_writer.write_str("ra").unwrap();
291            partial_writer.finish()
292        };
293
294        // Verify only two valid records.
295        let array = builder.finish();
296        assert_eq!(array.len(), 2);
297        assert_eq!(array.value_at(0), Some("Diana"));
298        assert_eq!(array.value_at(1), Some("Kira"));
299    }
300
301    #[test]
302    fn test_utf8_array() {
303        let input = vec![
304            Some("1"),
305            Some("22"),
306            None,
307            Some("4444"),
308            None,
309            Some("666666"),
310        ];
311
312        let array = Utf8Array::from_iter(&input);
313        assert_eq!(array.len(), input.len());
314        assert_eq!(input, array.iter().collect_vec());
315    }
316
317    #[test]
318    fn test_utf8_array_to_protobuf() {
319        let input = vec![
320            Some("1"),
321            Some("22"),
322            None,
323            Some("4444"),
324            None,
325            Some("666666"),
326        ];
327
328        let array = Utf8Array::from_iter(&input);
329        let buffers = array.to_protobuf().values;
330        assert!(buffers.len() >= 2);
331    }
332
333    #[test]
334    fn test_utf8_array_hash() {
335        use std::hash::BuildHasher;
336
337        use super::super::test_util::{hash_finish, test_hash};
338
339        const ARR_NUM: usize = 3;
340        const ARR_LEN: usize = 90;
341        let vecs: [Vec<Option<&str>>; ARR_NUM] = [
342            (0..ARR_LEN)
343                .map(|x| match x % 2 {
344                    0 => Some("1"),
345                    1 => None,
346                    _ => unreachable!(),
347                })
348                .collect_vec(),
349            (0..ARR_LEN)
350                .map(|x| match x % 3 {
351                    0 => Some("1"),
352                    1 => Some("abc"),
353                    2 => None,
354                    _ => unreachable!(),
355                })
356                .collect_vec(),
357            (0..ARR_LEN)
358                .map(|x| match x % 5 {
359                    0 => Some("1"),
360                    1 => Some("abc"),
361                    2 => None,
362                    3 => Some("ABCDEF"),
363                    4 => Some("666666"),
364                    _ => unreachable!(),
365                })
366                .collect_vec(),
367        ];
368
369        let arrs = vecs.iter().map(Utf8Array::from_iter).collect_vec();
370
371        let hasher_builder = twox_hash::xxhash64::RandomState::default();
372        let mut states = vec![hasher_builder.build_hasher(); ARR_LEN];
373        vecs.iter().for_each(|v| {
374            v.iter()
375                .zip_eq_fast(&mut states)
376                .for_each(|(x, state)| match x {
377                    Some(inner) => inner.hash(state),
378                    None => NULL_VAL_FOR_HASH.hash(state),
379                })
380        });
381        let hashes = hash_finish(&states[..]);
382
383        let count = hashes.iter().counts().len();
384        assert_eq!(count, 30);
385
386        test_hash(arrs, hashes, hasher_builder);
387    }
388}