1use std::fmt::Debug;
16use std::hash::Hash;
17use std::slice;
18
19use bytes::{Buf, BufMut};
20use itertools::{Itertools, repeat_n};
21use memcomparable::Error;
22use risingwave_common::types::F32;
23use risingwave_common_estimate_size::EstimateSize;
24use risingwave_pb::common::PbBuffer;
25use risingwave_pb::common::buffer::PbCompressionType;
26use risingwave_pb::data::{PbArray, PbArrayType, PbListArrayData};
27use serde::{Deserialize, Serialize};
28
29use super::{Array, ArrayBuilder};
30use crate::bitmap::{Bitmap, BitmapBuilder};
31use crate::types::{DataType, Scalar, ScalarRef, ToText};
32use crate::vector::{VectorInner, decode_vector_payload, encode_vector_payload};
33
34pub type VectorItemType = F32;
35pub type VectorDistanceType = f64;
36pub const VECTOR_ITEM_TYPE: DataType = DataType::Float32;
37pub const VECTOR_DISTANCE_TYPE: DataType = DataType::Float64;
38
39#[derive(Debug, Clone, EstimateSize)]
40pub struct VectorArrayBuilder {
41 bitmap: BitmapBuilder,
42 offsets: Vec<u32>,
43 inner: Vec<VectorItemType>,
44 elem_size: usize,
45}
46
47impl ArrayBuilder for VectorArrayBuilder {
48 type ArrayType = VectorArray;
49
50 #[cfg(not(test))]
51 fn new(_capacity: usize) -> Self {
52 panic!("please use `VectorArrayBuilder::with_type` instead");
53 }
54
55 #[cfg(test)]
56 fn new(capacity: usize) -> Self {
57 Self::with_type(capacity, VectorVal::test_type())
58 }
59
60 fn with_type(capacity: usize, ty: DataType) -> Self {
61 let DataType::Vector(elem_size) = ty else {
62 panic!("VectorArrayBuilder only supports Vector type");
63 };
64 let mut offsets = Vec::with_capacity(capacity + 1);
65 offsets.push(0);
66 Self {
67 bitmap: BitmapBuilder::with_capacity(capacity),
68 offsets,
69 inner: Vec::with_capacity(capacity * elem_size),
70 elem_size,
71 }
72 }
73
74 fn append_n(&mut self, n: usize, value: Option<VectorRef<'_>>) {
75 let last = self
76 .offsets
77 .last()
78 .cloned()
79 .expect("non-empty with an initial 0");
80 if let Some(value) = value {
81 assert_eq!(self.elem_size, value.inner.len());
82 self.inner.reserve(self.elem_size * n);
83 for _ in 0..n {
84 self.inner.extend_from_slice(value.inner);
85 }
86 self.offsets.reserve(n);
87 self.offsets.extend((1..=n).map(|i| {
88 last.checked_add((i * self.elem_size) as _)
89 .expect("overflow")
90 }));
91 self.bitmap.append_n(n, true);
92 } else {
93 self.offsets.reserve(n);
94 self.offsets.extend(repeat_n(last, n));
95 self.bitmap.append_n(n, false);
96 }
97 }
98
99 fn append_array(&mut self, other: &VectorArray) {
100 assert_eq!(self.elem_size, other.elem_size);
101 self.bitmap.append_bitmap(&other.bitmap);
102 let last = self
103 .offsets
104 .last()
105 .cloned()
106 .expect("non-empty with an initial 0");
107 let other_offsets = &other.offsets[1..];
108 self.offsets.reserve(other_offsets.len());
109 self.offsets.extend(
110 other_offsets
111 .iter()
112 .map(|offset| last.checked_add(*offset).expect("overflow")),
113 );
114 self.inner.reserve(other.inner.len());
115 self.inner.extend_from_slice(&other.inner);
116 }
117
118 fn pop(&mut self) -> Option<()> {
119 if self.bitmap.pop().is_some() {
120 self.offsets
121 .pop()
122 .expect("non-empty when bitmap popped Some");
123 let last = self
124 .offsets
125 .last()
126 .cloned()
127 .expect("non-empty with initial 0");
128 self.inner.truncate(last as _);
129 Some(())
130 } else {
131 None
132 }
133 }
134
135 fn len(&self) -> usize {
136 self.bitmap.len()
137 }
138
139 fn finish(self) -> VectorArray {
140 VectorArray {
141 bitmap: self.bitmap.finish(),
142 offsets: self.offsets,
143 inner: self.inner,
144 elem_size: self.elem_size,
145 }
146 }
147}
148
149#[derive(Debug, Clone)]
150pub struct VectorArray {
151 bitmap: Bitmap,
152 offsets: Vec<u32>,
155 inner: Vec<VectorItemType>,
156 elem_size: usize,
157}
158
159impl EstimateSize for VectorArray {
160 fn estimated_heap_size(&self) -> usize {
161 self.inner.estimated_heap_size()
162 }
163}
164
165impl Array for VectorArray {
166 type Builder = VectorArrayBuilder;
167 type OwnedItem = VectorVal;
168 type RefItem<'a> = VectorRef<'a>;
169
170 unsafe fn raw_value_at_unchecked(&self, idx: usize) -> Self::RefItem<'_> {
171 VectorRef {
172 inner: unsafe {
173 let start = self.inner.as_ptr().add(self.offsets[idx] as usize);
174 slice::from_raw_parts(start, self.elem_size)
175 },
176 }
177 }
178
179 fn len(&self) -> usize {
180 self.bitmap.len()
181 }
182
183 fn to_protobuf(&self) -> PbArray {
184 let mut payload = Vec::with_capacity(self.inner.len() * size_of::<VectorItemType>());
185 encode_vector_payload(self.inner.as_slice(), &mut payload);
186 PbArray {
187 array_type: PbArrayType::Vector as _,
188 null_bitmap: Some(self.bitmap.to_protobuf()),
189 values: vec![PbBuffer {
190 compression: PbCompressionType::None as _,
191 body: payload,
192 }],
193 struct_array_data: None,
194 list_array_data: Some(
195 PbListArrayData {
196 offsets: self.offsets.clone(),
197 value: None,
198 value_type: Some(DataType::Float32.to_protobuf()),
199 elem_size: Some(self.elem_size as _),
200 }
201 .into(),
202 ),
203 }
204 }
205
206 fn null_bitmap(&self) -> &Bitmap {
207 &self.bitmap
208 }
209
210 fn into_null_bitmap(self) -> Bitmap {
211 self.bitmap
212 }
213
214 fn set_bitmap(&mut self, bitmap: Bitmap) {
215 self.bitmap = bitmap;
216 }
217
218 fn data_type(&self) -> DataType {
219 DataType::Vector(self.elem_size)
220 }
221}
222
223impl VectorArray {
224 pub fn from_protobuf(
225 array: &risingwave_pb::data::PbArray,
226 ) -> super::ArrayResult<super::ArrayImpl> {
227 assert_eq!(
229 array.array_type,
230 PbArrayType::Vector as i32,
231 "invalid array type for vector: {}",
232 array.array_type
233 );
234 let bitmap: Bitmap = array.get_null_bitmap()?.into();
235 let encoded_payload = &array.values[0].body;
236 let payload = decode_vector_payload(
237 encoded_payload
238 .len()
239 .checked_exact_div(size_of::<VectorItemType>())
240 .unwrap_or_else(|| {
241 panic!("invalid payload len {} for vector", encoded_payload.len(),)
242 }),
243 array.values[0].body.as_slice(),
244 );
245 let array_data = array.get_list_array_data()?;
246 let elem_size = array_data.elem_size.expect("should exist for Vector") as usize;
247 let offsets = array_data.offsets.clone();
248 debug_assert_eq!(array_data.value_type, Some(DataType::Float32.to_protobuf()));
249 debug_assert_eq!(array_data.value, None);
250
251 Ok(VectorArray {
252 bitmap,
253 offsets,
254 inner: payload,
255 elem_size,
256 }
257 .into())
258 }
259
260 pub fn as_raw_slice(&self) -> &[f32] {
261 F32::inner_slice(&self.inner)
262 }
263
264 pub fn offsets(&self) -> &[u32] {
265 &self.offsets
266 }
267}
268
269pub type VectorVal = VectorInner<Box<[VectorItemType]>>;
270
271impl Debug for VectorVal {
272 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
273 self.to_ref().fmt(f)
274 }
275}
276
277impl Scalar for VectorVal {
278 type ScalarRefType<'a> = VectorRef<'a>;
279
280 fn as_scalar_ref(&self) -> VectorRef<'_> {
281 self.to_ref()
282 }
283}
284
285impl VectorVal {
286 #[cfg(test)]
287 pub const TEST_VECTOR_DIMENSION: usize = 3;
288
289 pub fn from_text(text: &str, size: usize) -> Result<Self, String> {
290 let text = text.trim();
291 let text = text
292 .strip_prefix('[')
293 .ok_or_else(|| "vector must start with [".to_owned())?
294 .strip_suffix(']')
295 .ok_or_else(|| "vector must end with ]".to_owned())?;
296 let inner = text
297 .split(',')
298 .map(|s| {
299 s.trim()
300 .parse::<f32>()
301 .map_err(|_| format!("invalid f32: {s}"))
302 .and_then(|f| {
303 if f.is_finite() {
304 Ok(f.into())
305 } else {
306 Err(format!("{f} not allowed in vector"))
307 }
308 })
309 })
310 .collect::<Result<Vec<_>, _>>()?;
311 if inner.len() != size {
312 return Err(format!("expected {} dimensions, not {}", size, inner.len()));
313 }
314 Ok(Self {
315 inner: inner.into(),
316 })
317 }
318
319 #[cfg(test)]
320 pub fn test_type() -> DataType {
321 DataType::Vector(Self::TEST_VECTOR_DIMENSION)
322 }
323
324 pub fn to_ref(&self) -> VectorRef<'_> {
325 VectorRef { inner: &self.inner }
326 }
327}
328
329#[derive(Clone, Copy, Debug)]
331#[repr(transparent)]
332pub struct Finite32(f32);
333
334impl TryFrom<f32> for Finite32 {
335 type Error = String;
336
337 fn try_from(value: f32) -> Result<Self, Self::Error> {
338 if value.is_finite() {
339 Ok(Self(value))
340 } else {
341 Err(format!("{value} not allowed in vector"))
342 }
343 }
344}
345
346impl From<Vec<Finite32>> for VectorVal {
347 fn from(value: Vec<Finite32>) -> Self {
348 let (ptr, len, cap) = value.into_raw_parts();
349 Self {
351 inner: unsafe { Vec::from_raw_parts(ptr as *mut F32, len, cap).into_boxed_slice() },
352 }
353 }
354}
355
356impl FromIterator<Finite32> for VectorVal {
357 fn from_iter<I: IntoIterator<Item = Finite32>>(iter: I) -> Self {
358 let inner = iter.into_iter().collect_vec();
359 Self::from(inner)
360 }
361}
362
363pub type VectorRef<'a> = VectorInner<&'a [VectorItemType]>;
364
365impl Debug for VectorRef<'_> {
366 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
367 self.write_with_type(&DataType::Vector(self.dimension()), f)
368 }
369}
370
371impl ToText for VectorRef<'_> {
372 fn write<W: std::fmt::Write>(&self, f: &mut W) -> std::fmt::Result {
373 self.write_with_type(&DataType::Vector(self.dimension()), f)
374 }
375
376 fn write_with_type<W: std::fmt::Write>(&self, _ty: &DataType, f: &mut W) -> std::fmt::Result {
377 write!(f, "[")?;
378 for (i, item) in self.inner.iter().enumerate() {
379 if i > 0 {
380 write!(f, ",")?;
381 }
382 write!(f, "{}", item)?;
383 }
384 write!(f, "]")
385 }
386}
387
388impl<'a> ScalarRef<'a> for VectorRef<'a> {
389 type ScalarType = VectorVal;
390
391 fn to_owned_scalar(&self) -> VectorVal {
392 VectorVal {
393 inner: self.inner.to_vec().into_boxed_slice(),
394 }
395 }
396
397 fn hash_scalar<H: std::hash::Hasher>(&self, state: &mut H) {
398 self.inner.hash(state);
399 }
400}
401
402impl<'a> VectorRef<'a> {
403 pub fn from_slice_unchecked(inner: &'a [VectorItemType]) -> Self {
406 Self { inner }
407 }
408
409 pub fn memcmp_serialize(
410 self,
411 serializer: &mut memcomparable::Serializer<impl BufMut>,
412 ) -> memcomparable::Result<()> {
413 for item in self.inner {
414 item.serialize(&mut *serializer)?;
415 }
416 Ok(())
417 }
418}
419
420impl VectorVal {
421 pub fn memcmp_deserialize(
422 dimension: usize,
423 de: &mut memcomparable::Deserializer<impl Buf>,
424 ) -> memcomparable::Result<Self> {
425 let mut value = Vec::with_capacity(dimension);
426 for _ in 0..dimension {
427 value.push(Finite32::try_from(f32::deserialize(&mut *de)?).map_err(Error::Message)?)
428 }
429 Ok(VectorVal::from(value))
430 }
431}