risingwave_common/array/
vector_array.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Debug;
16use std::hash::Hash;
17use std::slice;
18
19use bytes::{Buf, BufMut};
20use itertools::{Itertools, repeat_n};
21use memcomparable::Error;
22use risingwave_common::types::F32;
23use risingwave_common_estimate_size::EstimateSize;
24use risingwave_pb::common::PbBuffer;
25use risingwave_pb::common::buffer::PbCompressionType;
26use risingwave_pb::data::{PbArray, PbArrayType, PbListArrayData};
27use serde::{Deserialize, Serialize};
28
29use super::{Array, ArrayBuilder};
30use crate::bitmap::{Bitmap, BitmapBuilder};
31use crate::types::{DataType, Scalar, ScalarRef, ToText};
32use crate::vector::{VectorInner, decode_vector_payload, encode_vector_payload};
33
34pub type VectorItemType = F32;
35pub type VectorDistanceType = f64;
36pub const VECTOR_ITEM_TYPE: DataType = DataType::Float32;
37pub const VECTOR_DISTANCE_TYPE: DataType = DataType::Float64;
38
39#[derive(Debug, Clone, EstimateSize)]
40pub struct VectorArrayBuilder {
41    bitmap: BitmapBuilder,
42    offsets: Vec<u32>,
43    inner: Vec<VectorItemType>,
44    elem_size: usize,
45}
46
47impl ArrayBuilder for VectorArrayBuilder {
48    type ArrayType = VectorArray;
49
50    #[cfg(not(test))]
51    fn new(_capacity: usize) -> Self {
52        panic!("please use `VectorArrayBuilder::with_type` instead");
53    }
54
55    #[cfg(test)]
56    fn new(capacity: usize) -> Self {
57        Self::with_type(capacity, VectorVal::test_type())
58    }
59
60    fn with_type(capacity: usize, ty: DataType) -> Self {
61        let DataType::Vector(elem_size) = ty else {
62            panic!("VectorArrayBuilder only supports Vector type");
63        };
64        let mut offsets = Vec::with_capacity(capacity + 1);
65        offsets.push(0);
66        Self {
67            bitmap: BitmapBuilder::with_capacity(capacity),
68            offsets,
69            inner: Vec::with_capacity(capacity * elem_size),
70            elem_size,
71        }
72    }
73
74    fn append_n(&mut self, n: usize, value: Option<VectorRef<'_>>) {
75        let last = self
76            .offsets
77            .last()
78            .cloned()
79            .expect("non-empty with an initial 0");
80        if let Some(value) = value {
81            assert_eq!(self.elem_size, value.inner.len());
82            self.inner.reserve(self.elem_size * n);
83            for _ in 0..n {
84                self.inner.extend_from_slice(value.inner);
85            }
86            self.offsets.reserve(n);
87            self.offsets.extend((1..=n).map(|i| {
88                last.checked_add((i * self.elem_size) as _)
89                    .expect("overflow")
90            }));
91            self.bitmap.append_n(n, true);
92        } else {
93            self.offsets.reserve(n);
94            self.offsets.extend(repeat_n(last, n));
95            self.bitmap.append_n(n, false);
96        }
97    }
98
99    fn append_array(&mut self, other: &VectorArray) {
100        assert_eq!(self.elem_size, other.elem_size);
101        self.bitmap.append_bitmap(&other.bitmap);
102        let last = self
103            .offsets
104            .last()
105            .cloned()
106            .expect("non-empty with an initial 0");
107        let other_offsets = &other.offsets[1..];
108        self.offsets.reserve(other_offsets.len());
109        self.offsets.extend(
110            other_offsets
111                .iter()
112                .map(|offset| last.checked_add(*offset).expect("overflow")),
113        );
114        self.inner.reserve(other.inner.len());
115        self.inner.extend_from_slice(&other.inner);
116    }
117
118    fn pop(&mut self) -> Option<()> {
119        if self.bitmap.pop().is_some() {
120            self.offsets
121                .pop()
122                .expect("non-empty when bitmap popped Some");
123            let last = self
124                .offsets
125                .last()
126                .cloned()
127                .expect("non-empty with initial 0");
128            self.inner.truncate(last as _);
129            Some(())
130        } else {
131            None
132        }
133    }
134
135    fn len(&self) -> usize {
136        self.bitmap.len()
137    }
138
139    fn finish(self) -> VectorArray {
140        VectorArray {
141            bitmap: self.bitmap.finish(),
142            offsets: self.offsets,
143            inner: self.inner,
144            elem_size: self.elem_size,
145        }
146    }
147}
148
149#[derive(Debug, Clone)]
150pub struct VectorArray {
151    bitmap: Bitmap,
152    /// Of size as `bitmap.len() + 1`. `(self.offsets[i]..self.offsets[i+1])` is the slice range of the i-th vector
153    /// if it's not null.
154    offsets: Vec<u32>,
155    inner: Vec<VectorItemType>,
156    elem_size: usize,
157}
158
159impl EstimateSize for VectorArray {
160    fn estimated_heap_size(&self) -> usize {
161        self.inner.estimated_heap_size()
162    }
163}
164
165impl Array for VectorArray {
166    type Builder = VectorArrayBuilder;
167    type OwnedItem = VectorVal;
168    type RefItem<'a> = VectorRef<'a>;
169
170    unsafe fn raw_value_at_unchecked(&self, idx: usize) -> Self::RefItem<'_> {
171        VectorRef {
172            inner: unsafe {
173                let start = self.inner.as_ptr().add(self.offsets[idx] as usize);
174                slice::from_raw_parts(start, self.elem_size)
175            },
176        }
177    }
178
179    fn len(&self) -> usize {
180        self.bitmap.len()
181    }
182
183    fn to_protobuf(&self) -> PbArray {
184        let mut payload = Vec::with_capacity(self.inner.len() * size_of::<VectorItemType>());
185        encode_vector_payload(self.inner.as_slice(), &mut payload);
186        PbArray {
187            array_type: PbArrayType::Vector as _,
188            null_bitmap: Some(self.bitmap.to_protobuf()),
189            values: vec![PbBuffer {
190                compression: PbCompressionType::None as _,
191                body: payload,
192            }],
193            struct_array_data: None,
194            list_array_data: Some(
195                PbListArrayData {
196                    offsets: self.offsets.clone(),
197                    value: None,
198                    value_type: Some(DataType::Float32.to_protobuf()),
199                    elem_size: Some(self.elem_size as _),
200                }
201                .into(),
202            ),
203        }
204    }
205
206    fn null_bitmap(&self) -> &Bitmap {
207        &self.bitmap
208    }
209
210    fn into_null_bitmap(self) -> Bitmap {
211        self.bitmap
212    }
213
214    fn set_bitmap(&mut self, bitmap: Bitmap) {
215        self.bitmap = bitmap;
216    }
217
218    fn data_type(&self) -> DataType {
219        DataType::Vector(self.elem_size)
220    }
221}
222
223impl VectorArray {
224    pub fn from_protobuf(
225        array: &risingwave_pb::data::PbArray,
226    ) -> super::ArrayResult<super::ArrayImpl> {
227        // reversing to_protobuf
228        assert_eq!(
229            array.array_type,
230            PbArrayType::Vector as i32,
231            "invalid array type for vector: {}",
232            array.array_type
233        );
234        let bitmap: Bitmap = array.get_null_bitmap()?.into();
235        let encoded_payload = &array.values[0].body;
236        let payload = decode_vector_payload(
237            encoded_payload
238                .len()
239                .checked_exact_div(size_of::<VectorItemType>())
240                .unwrap_or_else(|| {
241                    panic!("invalid payload len {} for vector", encoded_payload.len(),)
242                }),
243            array.values[0].body.as_slice(),
244        );
245        let array_data = array.get_list_array_data()?;
246        let elem_size = array_data.elem_size.expect("should exist for Vector") as usize;
247        let offsets = array_data.offsets.clone();
248        debug_assert_eq!(array_data.value_type, Some(DataType::Float32.to_protobuf()));
249        debug_assert_eq!(array_data.value, None);
250
251        Ok(VectorArray {
252            bitmap,
253            offsets,
254            inner: payload,
255            elem_size,
256        }
257        .into())
258    }
259
260    pub fn as_raw_slice(&self) -> &[f32] {
261        F32::inner_slice(&self.inner)
262    }
263
264    pub fn offsets(&self) -> &[u32] {
265        &self.offsets
266    }
267}
268
269pub type VectorVal = VectorInner<Box<[VectorItemType]>>;
270
271impl Debug for VectorVal {
272    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
273        self.to_ref().fmt(f)
274    }
275}
276
277impl Scalar for VectorVal {
278    type ScalarRefType<'a> = VectorRef<'a>;
279
280    fn as_scalar_ref(&self) -> VectorRef<'_> {
281        self.to_ref()
282    }
283}
284
285impl VectorVal {
286    #[cfg(test)]
287    pub const TEST_VECTOR_DIMENSION: usize = 3;
288
289    pub fn from_text(text: &str, size: usize) -> Result<Self, String> {
290        let text = text.trim();
291        let text = text
292            .strip_prefix('[')
293            .ok_or_else(|| "vector must start with [".to_owned())?
294            .strip_suffix(']')
295            .ok_or_else(|| "vector must end with ]".to_owned())?;
296        let inner = text
297            .split(',')
298            .map(|s| {
299                s.trim()
300                    .parse::<f32>()
301                    .map_err(|_| format!("invalid f32: {s}"))
302                    .and_then(|f| {
303                        if f.is_finite() {
304                            Ok(f.into())
305                        } else {
306                            Err(format!("{f} not allowed in vector"))
307                        }
308                    })
309            })
310            .collect::<Result<Vec<_>, _>>()?;
311        if inner.len() != size {
312            return Err(format!("expected {} dimensions, not {}", size, inner.len()));
313        }
314        Ok(Self {
315            inner: inner.into(),
316        })
317    }
318
319    #[cfg(test)]
320    pub fn test_type() -> DataType {
321        DataType::Vector(Self::TEST_VECTOR_DIMENSION)
322    }
323
324    pub fn to_ref(&self) -> VectorRef<'_> {
325        VectorRef { inner: &self.inner }
326    }
327}
328
329/// A `f32` without nan/inf/-inf. Added as intermediate type to `try_collect` `f32` values into a `VectorVal`.
330#[derive(Clone, Copy, Debug)]
331#[repr(transparent)]
332pub struct Finite32(f32);
333
334impl TryFrom<f32> for Finite32 {
335    type Error = String;
336
337    fn try_from(value: f32) -> Result<Self, Self::Error> {
338        if value.is_finite() {
339            Ok(Self(value))
340        } else {
341            Err(format!("{value} not allowed in vector"))
342        }
343    }
344}
345
346impl From<Vec<Finite32>> for VectorVal {
347    fn from(value: Vec<Finite32>) -> Self {
348        let (ptr, len, cap) = value.into_raw_parts();
349        // Safety: OrderedFloat is #[repr(transparent)] and has no invalid values.
350        Self {
351            inner: unsafe { Vec::from_raw_parts(ptr as *mut F32, len, cap).into_boxed_slice() },
352        }
353    }
354}
355
356impl FromIterator<Finite32> for VectorVal {
357    fn from_iter<I: IntoIterator<Item = Finite32>>(iter: I) -> Self {
358        let inner = iter.into_iter().collect_vec();
359        Self::from(inner)
360    }
361}
362
363pub type VectorRef<'a> = VectorInner<&'a [VectorItemType]>;
364
365impl Debug for VectorRef<'_> {
366    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
367        self.write_with_type(&DataType::Vector(self.dimension()), f)
368    }
369}
370
371impl ToText for VectorRef<'_> {
372    fn write<W: std::fmt::Write>(&self, f: &mut W) -> std::fmt::Result {
373        self.write_with_type(&DataType::Vector(self.dimension()), f)
374    }
375
376    fn write_with_type<W: std::fmt::Write>(&self, _ty: &DataType, f: &mut W) -> std::fmt::Result {
377        write!(f, "[")?;
378        for (i, item) in self.inner.iter().enumerate() {
379            if i > 0 {
380                write!(f, ",")?;
381            }
382            write!(f, "{}", item)?;
383        }
384        write!(f, "]")
385    }
386}
387
388impl<'a> ScalarRef<'a> for VectorRef<'a> {
389    type ScalarType = VectorVal;
390
391    fn to_owned_scalar(&self) -> VectorVal {
392        VectorVal {
393            inner: self.inner.to_vec().into_boxed_slice(),
394        }
395    }
396
397    fn hash_scalar<H: std::hash::Hasher>(&self, state: &mut H) {
398        self.inner.hash(state);
399    }
400}
401
402impl<'a> VectorRef<'a> {
403    /// Create a `VectorRef` from a slice of `VectorItemType` without checking the elements in the slice
404    /// is invalid, such as `inf` and `nan`.
405    pub fn from_slice_unchecked(inner: &'a [VectorItemType]) -> Self {
406        Self { inner }
407    }
408
409    pub fn memcmp_serialize(
410        self,
411        serializer: &mut memcomparable::Serializer<impl BufMut>,
412    ) -> memcomparable::Result<()> {
413        for item in self.inner {
414            item.serialize(&mut *serializer)?;
415        }
416        Ok(())
417    }
418}
419
420impl VectorVal {
421    pub fn memcmp_deserialize(
422        dimension: usize,
423        de: &mut memcomparable::Deserializer<impl Buf>,
424    ) -> memcomparable::Result<Self> {
425        let mut value = Vec::with_capacity(dimension);
426        for _ in 0..dimension {
427            value.push(Finite32::try_from(f32::deserialize(&mut *de)?).map_err(Error::Message)?)
428        }
429        Ok(VectorVal::from(value))
430    }
431}