risingwave_common/array/
utf8_array.rs1use std::fmt::{Display, Write};
16
17use risingwave_common_estimate_size::EstimateSize;
18use risingwave_pb::data::{ArrayType, PbArray};
19
20use super::bytes_array::{BytesWriter, PartialBytesWriter};
21use super::{Array, ArrayBuilder, BytesArray, BytesArrayBuilder, DataType};
22use crate::bitmap::Bitmap;
23
24#[derive(Debug, Clone, PartialEq, Eq)]
26pub struct Utf8Array {
27 bytes: BytesArray,
28}
29
30impl EstimateSize for Utf8Array {
31 fn estimated_heap_size(&self) -> usize {
32 self.bytes.estimated_heap_size()
33 }
34}
35
36impl Array for Utf8Array {
37 type Builder = Utf8ArrayBuilder;
38 type OwnedItem = Box<str>;
39 type RefItem<'a> = &'a str;
40
41 unsafe fn raw_value_at_unchecked(&self, idx: usize) -> Self::RefItem<'_> {
42 unsafe {
43 let bytes = self.bytes.raw_value_at_unchecked(idx);
44 std::str::from_utf8_unchecked(bytes)
45 }
46 }
47
48 #[inline]
49 fn len(&self) -> usize {
50 self.bytes.len()
51 }
52
53 #[inline]
54 fn to_protobuf(&self) -> PbArray {
55 PbArray {
56 array_type: ArrayType::Utf8 as i32,
57 ..self.bytes.to_protobuf()
58 }
59 }
60
61 fn null_bitmap(&self) -> &Bitmap {
62 self.bytes.null_bitmap()
63 }
64
65 fn into_null_bitmap(self) -> Bitmap {
66 self.bytes.into_null_bitmap()
67 }
68
69 fn set_bitmap(&mut self, bitmap: Bitmap) {
70 self.bytes.set_bitmap(bitmap);
71 }
72
73 fn data_type(&self) -> DataType {
74 DataType::Varchar
75 }
76}
77
78impl<'a> FromIterator<Option<&'a str>> for Utf8Array {
79 fn from_iter<I: IntoIterator<Item = Option<&'a str>>>(iter: I) -> Self {
80 Self {
81 bytes: iter.into_iter().map(|s| s.map(|s| s.as_bytes())).collect(),
82 }
83 }
84}
85
86impl<'a> FromIterator<&'a Option<&'a str>> for Utf8Array {
87 fn from_iter<I: IntoIterator<Item = &'a Option<&'a str>>>(iter: I) -> Self {
88 iter.into_iter().cloned().collect()
89 }
90}
91
92impl<'a> FromIterator<&'a str> for Utf8Array {
93 fn from_iter<I: IntoIterator<Item = &'a str>>(iter: I) -> Self {
94 iter.into_iter().map(Some).collect()
95 }
96}
97
98impl Utf8Array {
99 pub fn into_bytes_array(self) -> BytesArray {
100 self.bytes
101 }
102
103 pub fn from_iter_display(iter: impl IntoIterator<Item = Option<impl Display>>) -> Self {
104 let iter = iter.into_iter();
105 let mut builder = Utf8ArrayBuilder::new(iter.size_hint().0);
106 for e in iter {
107 if let Some(s) = e {
108 let mut writer = builder.writer().begin();
109 write!(writer, "{}", s).unwrap();
110 writer.finish();
111 } else {
112 builder.append_null();
113 }
114 }
115 builder.finish()
116 }
117}
118
119#[derive(Debug, Clone, EstimateSize)]
121pub struct Utf8ArrayBuilder {
122 bytes: BytesArrayBuilder,
123}
124
125impl ArrayBuilder for Utf8ArrayBuilder {
126 type ArrayType = Utf8Array;
127
128 fn new(item_capacity: usize) -> Self {
134 Self {
135 bytes: BytesArrayBuilder::new(item_capacity),
136 }
137 }
138
139 fn with_type(item_capacity: usize, ty: DataType) -> Self {
140 assert_eq!(ty, DataType::Varchar);
141 Self::new(item_capacity)
142 }
143
144 #[inline]
145 fn append_n<'a>(&'a mut self, n: usize, value: Option<&'a str>) {
146 self.bytes.append_n(n, value.map(|v| v.as_bytes()));
147 }
148
149 #[inline]
150 fn append_array(&mut self, other: &Utf8Array) {
151 self.bytes.append_array(&other.bytes);
152 }
153
154 #[inline]
155 fn pop(&mut self) -> Option<()> {
156 self.bytes.pop()
157 }
158
159 fn len(&self) -> usize {
160 self.bytes.len()
161 }
162
163 fn finish(self) -> Utf8Array {
164 Utf8Array {
165 bytes: self.bytes.finish(),
166 }
167 }
168}
169
170impl Utf8ArrayBuilder {
171 pub fn writer(&mut self) -> StringWriter<'_> {
172 StringWriter {
173 bytes: self.bytes.writer(),
174 }
175 }
176
177 pub fn append_display(&mut self, value: Option<impl Display>) {
179 if let Some(s) = value {
180 let mut writer = self.writer().begin();
181 write!(writer, "{}", s).unwrap();
182 writer.finish();
183 } else {
184 self.append_null();
185 }
186 }
187}
188
189pub struct StringWriter<'a> {
190 bytes: BytesWriter<'a>,
191}
192
193impl<'a> StringWriter<'a> {
194 pub fn begin(self) -> PartialStringWriter<'a> {
197 PartialStringWriter {
198 bytes: self.bytes.begin(),
199 }
200 }
201}
202
203pub struct PartialStringWriter<'a> {
206 bytes: PartialBytesWriter<'a>,
207}
208
209impl PartialStringWriter<'_> {
210 pub fn finish(self) {
213 self.bytes.finish()
214 }
215}
216
217impl Write for PartialStringWriter<'_> {
218 fn write_str(&mut self, s: &str) -> std::fmt::Result {
219 self.bytes.write_ref(s.as_bytes());
220 Ok(())
221 }
222}
223
224#[cfg(test)]
225mod tests {
226 use std::hash::Hash;
227
228 use itertools::Itertools;
229
230 use super::*;
231 use crate::array::NULL_VAL_FOR_HASH;
232 use crate::util::iter_util::ZipEqFast;
233
234 #[test]
235 fn test_utf8_builder() {
236 let mut builder = Utf8ArrayBuilder::new(0);
237 for i in 0..100 {
238 if i % 2 == 0 {
239 builder.append(Some(&format!("{}", i)));
240 } else {
241 builder.append(None);
242 }
243 }
244 builder.finish();
245 }
246
247 #[test]
248 fn test_utf8_partial_writer() {
249 let mut builder = Utf8ArrayBuilder::new(0);
250 {
251 let writer = builder.writer();
252 let mut partial_writer = writer.begin();
253 for _ in 0..2 {
254 partial_writer.write_str("ran").unwrap();
255 }
256 partial_writer.finish()
257 };
258 let array = builder.finish();
259 assert_eq!(array.len(), 1);
260 assert_eq!(array.value_at(0), Some("ranran"));
261 assert_eq!(unsafe { array.value_at_unchecked(0) }, Some("ranran"));
262 }
263
264 #[test]
265 fn test_utf8_partial_writer_failed() {
266 let mut builder = Utf8ArrayBuilder::new(0);
267 {
269 let writer = builder.writer();
270 let mut partial_writer = writer.begin();
271 partial_writer.write_str("Dia").unwrap();
272 partial_writer.write_str("na").unwrap();
273 partial_writer.finish()
274 };
275
276 {
278 let writer = builder.writer();
279 let mut partial_writer = writer.begin();
280 partial_writer.write_str("Ca").unwrap();
281 partial_writer.write_str("rol").unwrap();
282 };
284
285 {
287 let writer = builder.writer();
288 let mut partial_writer = writer.begin();
289 partial_writer.write_str("Ki").unwrap();
290 partial_writer.write_str("ra").unwrap();
291 partial_writer.finish()
292 };
293
294 let array = builder.finish();
296 assert_eq!(array.len(), 2);
297 assert_eq!(array.value_at(0), Some("Diana"));
298 assert_eq!(array.value_at(1), Some("Kira"));
299 }
300
301 #[test]
302 fn test_utf8_array() {
303 let input = vec![
304 Some("1"),
305 Some("22"),
306 None,
307 Some("4444"),
308 None,
309 Some("666666"),
310 ];
311
312 let array = Utf8Array::from_iter(&input);
313 assert_eq!(array.len(), input.len());
314 assert_eq!(input, array.iter().collect_vec());
315 }
316
317 #[test]
318 fn test_utf8_array_to_protobuf() {
319 let input = vec![
320 Some("1"),
321 Some("22"),
322 None,
323 Some("4444"),
324 None,
325 Some("666666"),
326 ];
327
328 let array = Utf8Array::from_iter(&input);
329 let buffers = array.to_protobuf().values;
330 assert!(buffers.len() >= 2);
331 }
332
333 #[test]
334 fn test_utf8_array_hash() {
335 use std::hash::BuildHasher;
336
337 use super::super::test_util::{hash_finish, test_hash};
338
339 const ARR_NUM: usize = 3;
340 const ARR_LEN: usize = 90;
341 let vecs: [Vec<Option<&str>>; ARR_NUM] = [
342 (0..ARR_LEN)
343 .map(|x| match x % 2 {
344 0 => Some("1"),
345 1 => None,
346 _ => unreachable!(),
347 })
348 .collect_vec(),
349 (0..ARR_LEN)
350 .map(|x| match x % 3 {
351 0 => Some("1"),
352 1 => Some("abc"),
353 2 => None,
354 _ => unreachable!(),
355 })
356 .collect_vec(),
357 (0..ARR_LEN)
358 .map(|x| match x % 5 {
359 0 => Some("1"),
360 1 => Some("abc"),
361 2 => None,
362 3 => Some("ABCDEF"),
363 4 => Some("666666"),
364 _ => unreachable!(),
365 })
366 .collect_vec(),
367 ];
368
369 let arrs = vecs.iter().map(Utf8Array::from_iter).collect_vec();
370
371 let hasher_builder = twox_hash::xxhash64::RandomState::default();
372 let mut states = vec![hasher_builder.build_hasher(); ARR_LEN];
373 vecs.iter().for_each(|v| {
374 v.iter()
375 .zip_eq_fast(&mut states)
376 .for_each(|(x, state)| match x {
377 Some(inner) => inner.hash(state),
378 None => NULL_VAL_FOR_HASH.hash(state),
379 })
380 });
381 let hashes = hash_finish(&states[..]);
382
383 let count = hashes.iter().counts().len();
384 assert_eq!(count, 30);
385
386 test_hash(arrs, hashes, hasher_builder);
387 }
388}