risingwave_storage/hummock/vector/
file.rs1use std::mem::take;
16use std::slice;
17use std::sync::Arc;
18
19use bytes::{Buf, BufMut, Bytes, BytesMut};
20use futures::future::BoxFuture;
21use risingwave_hummock_sdk::HummockVectorFileId;
22use risingwave_hummock_sdk::vector_index::VectorFileInfo;
23use risingwave_object_store::object::ObjectStreamingUploader;
24
25use crate::hummock::{HummockError, HummockResult, xxhash64_checksum, xxhash64_verify};
26use crate::vector::VectorRef;
27
28const VECTOR_FILE_VERSION: u32 = 1;
29const VECTOR_FILE_MAGIC_NUM: u32 = 0x3866cd92;
30
31#[cfg_attr(test, derive(PartialEq, Debug))]
32pub struct VectorBlockInner {
33 dimension: usize,
34 vector_payload: Vec<f32>,
35 info_payload: Vec<u8>,
36 info_offset: Vec<u32>,
37}
38
39impl VectorBlockInner {
40 pub fn count(&self) -> usize {
41 self.info_offset.len()
42 }
43
44 pub fn vec_ref(&self, idx: usize) -> VectorRef<'_> {
45 let start = idx * self.dimension;
46 let end = start + self.dimension;
47 VectorRef::from_slice(&self.vector_payload[start..end])
48 }
49
50 pub fn info(&self, idx: usize) -> &[u8] {
51 let start = if idx == 0 {
52 0
53 } else {
54 self.info_offset[idx - 1] as usize
55 };
56 let end = self.info_offset[idx] as usize;
57 &self.info_payload[start..end]
58 }
59
60 pub fn encoded_len(&self) -> usize {
61 size_of::<u32>() + size_of::<u32>() + self.info_offset.len() * size_of::<u32>() + self.info_payload.len() + self.vector_payload.len() * size_of::<f32>() }
67}
68
69pub struct VectorBlockBuilder {
70 inner: VectorBlockInner,
71 encoded_len: usize,
72}
73
74impl VectorBlockBuilder {
75 pub fn new(dimension: usize) -> Self {
76 Self {
77 inner: VectorBlockInner {
78 dimension,
79 vector_payload: vec![],
80 info_payload: vec![],
81 info_offset: vec![],
82 },
83 encoded_len: size_of::<u32>() + size_of::<u32>(), }
86 }
87
88 pub fn add(&mut self, vec: VectorRef<'_>, info: &[u8]) {
89 let slice = vec.as_slice();
90 assert_eq!(self.inner.dimension, slice.len());
91 self.inner.vector_payload.extend_from_slice(slice);
92 self.inner.info_payload.extend_from_slice(info);
93 let offset = self.inner.info_payload.len();
94 self.inner.info_offset.push(
95 offset
96 .try_into()
97 .unwrap_or_else(|_| panic!("offset {} overflow", offset)),
98 );
99 self.encoded_len += size_of::<u32>() + size_of_val(slice) + info.len()
100 }
101
102 pub fn encoded_len(&self) -> usize {
103 debug_assert_eq!(self.encoded_len, self.inner.encoded_len());
104 self.encoded_len
105 }
106
107 pub fn finish(self) -> Option<VectorBlock> {
108 if !self.inner.info_offset.is_empty() {
109 Some(VectorBlock(Arc::new(self.inner)))
110 } else {
111 None
112 }
113 }
114}
115
116#[cfg_attr(test, derive(PartialEq, Debug))]
117#[derive(Clone)]
118pub struct VectorBlock(Arc<VectorBlockInner>);
119
120impl VectorBlock {
121 pub fn count(&self) -> usize {
122 self.0.count()
123 }
124
125 pub fn vec_ref(&self, idx: usize) -> VectorRef<'_> {
126 self.0.vec_ref(idx)
127 }
128
129 pub fn info(&self, idx: usize) -> &[u8] {
130 self.0.info(idx)
131 }
132
133 fn encode_payload(&self, mut buf: impl BufMut) {
142 buf.put_u32_le(self.0.dimension.try_into().unwrap());
143 let vector_count = self.0.info_offset.len();
144 assert!(vector_count > 0);
145 buf.put_u32_le(
146 vector_count
147 .try_into()
148 .unwrap_or_else(|_| panic!("vector count {} overflow", vector_count)),
149 );
150 for offset in &self.0.info_offset {
151 buf.put_u32_le(*offset);
152 }
153 let last_offset = *self.0.info_offset.last().unwrap();
154 assert_eq!(last_offset as usize, self.0.info_payload.len());
155 buf.put_slice(&self.0.info_payload);
156 assert_eq!(self.0.vector_payload.len(), self.0.dimension * vector_count);
157 let vector_payload_ptr = self.0.vector_payload.as_slice().as_ptr() as *const u8;
158 let vector_payload_slice = unsafe {
160 slice::from_raw_parts(
161 vector_payload_ptr,
162 self.0.vector_payload.len() * size_of::<f32>(),
163 )
164 };
165 buf.put_slice(vector_payload_slice);
166 }
167
168 fn decode_payload(mut buf: impl Buf) -> Self {
169 let dimension: usize = buf.get_u32_le().try_into().unwrap();
170 let vector_count = buf.get_u32_le() as usize;
171 let mut info_offset = Vec::with_capacity(vector_count);
172 for _ in 0..vector_count {
173 let offset = buf.get_u32_le();
174 info_offset.push(offset);
175 }
176 let info_payload_len = *info_offset.last().expect("non-empty") as usize;
177 let mut info_payload = vec![0; info_payload_len];
178 buf.copy_to_slice(&mut info_payload);
179 let vector_item_count = dimension * vector_count;
180 let mut vector_payload = Vec::with_capacity(vector_item_count);
181
182 let vector_payload_ptr = vector_payload.spare_capacity_mut().as_mut_ptr() as *mut u8;
183 let vector_payload_slice = unsafe {
185 slice::from_raw_parts_mut(vector_payload_ptr, vector_item_count * size_of::<f32>())
186 };
187 buf.copy_to_slice(vector_payload_slice);
188 unsafe {
190 vector_payload.set_len(vector_item_count);
191 }
192 Self(Arc::new(VectorBlockInner {
193 dimension,
194 vector_payload,
195 info_payload,
196 info_offset,
197 }))
198 }
199
200 fn encode(&self, encoded_block_len: usize) -> Bytes {
201 let encoded_len = encoded_block_len + size_of::<u64>(); let mut encoded_block = BytesMut::with_capacity(encoded_len);
203 self.encode_payload(&mut encoded_block);
204 let checksum = xxhash64_checksum(encoded_block.as_ref());
205 encoded_block.put_u64_le(checksum);
206 let encoded_block = encoded_block.freeze();
207 debug_assert_eq!(encoded_block.len(), encoded_len);
208 encoded_block
209 }
210
211 pub fn decode(buf: &[u8]) -> HummockResult<Self> {
212 if buf.len() < size_of::<u64>() {
213 return Err(HummockError::decode_error("block too short"));
214 }
215 let back_cursor_end = buf.len();
216 let back_cursor_start = back_cursor_end - size_of::<u64>();
217 let payload = &buf[..back_cursor_start];
218 {
219 let checksum =
220 u64::from_le_bytes(buf[back_cursor_start..back_cursor_end].try_into().unwrap());
221 xxhash64_verify(payload, checksum)?;
222 }
223 Ok(Self::decode_payload(payload))
224 }
225}
226
227impl<'a> IntoIterator for &'a VectorBlock {
228 type Item = (VectorRef<'a>, &'a [u8]);
229
230 type IntoIter = impl Iterator<Item = Self::Item>;
231
232 fn into_iter(self) -> Self::IntoIter {
233 (0..self.0.info_offset.len()).map(|i| (self.vec_ref(i), self.info(i)))
234 }
235}
236
237pub struct VectorBlockMeta {
238 pub offset: usize,
239 pub block_size: usize,
240 pub vector_count: usize,
241 pub start_vector_id: usize,
242}
243
244impl VectorBlockMeta {
245 fn encode(&self, mut buf: impl BufMut) {
246 buf.put_u64_le(self.offset.try_into().unwrap());
247 buf.put_u64_le(self.block_size.try_into().unwrap());
248 buf.put_u64_le(self.vector_count.try_into().unwrap());
249 buf.put_u64_le(self.start_vector_id.try_into().unwrap());
250 }
251
252 fn encoded_len() -> usize {
253 size_of::<u64>() * 4 }
255
256 fn decode(mut buf: impl Buf) -> Self {
257 let offset = buf.get_u64_le().try_into().unwrap();
258 let block_size = buf.get_u64_le().try_into().unwrap();
259 let vector_count = buf.get_u64_le().try_into().unwrap();
260 let start_vector_id = buf.get_u64_le().try_into().unwrap();
261 Self {
262 offset,
263 block_size,
264 vector_count,
265 start_vector_id,
266 }
267 }
268}
269
270pub struct VectorFileMeta {
271 pub block_metas: Vec<VectorBlockMeta>,
272}
273
274impl VectorFileMeta {
275 fn encode(&self, mut buf: impl BufMut) {
276 buf.put_u32_le(self.block_metas.len().try_into().unwrap());
277 for meta in &self.block_metas {
278 meta.encode(&mut buf);
279 }
280 }
281
282 fn encode_len(&self) -> usize {
283 size_of::<u32>() + self.block_metas.len() * VectorBlockMeta::encoded_len()
285 }
286
287 fn decode(mut buf: impl Buf) -> Self {
288 let block_count = buf.get_u32_le() as usize;
289 let mut block_metas = Vec::with_capacity(block_count);
290 for _ in 0..block_count {
291 block_metas.push(VectorBlockMeta::decode(&mut buf));
292 }
293 Self { block_metas }
294 }
295
296 fn preserved_footer_len() -> usize {
297 size_of::<u64>() + size_of::<u64>() + size_of::<u32>() + size_of::<u32>() }
302
303 fn encode_footer(&self, mete_offset: usize) -> Bytes {
304 let encoded_footer_len = self.encode_len() + Self::preserved_footer_len();
305 let mut encoded_footer = BytesMut::with_capacity(encoded_footer_len);
306 self.encode(&mut encoded_footer);
307 let checksum = xxhash64_checksum(encoded_footer.as_ref());
308 encoded_footer.put_u64_le(checksum);
309 encoded_footer.put_u64_le(mete_offset.try_into().unwrap());
310 encoded_footer.put_u32_le(VECTOR_FILE_VERSION);
311 encoded_footer.put_u32_le(VECTOR_FILE_MAGIC_NUM);
312 let encoded_footer = encoded_footer.freeze();
313 debug_assert_eq!(encoded_footer.len(), encoded_footer_len);
314 encoded_footer
315 }
316
317 pub fn decode_footer(buf: &[u8]) -> HummockResult<Self> {
318 if buf.len() < Self::preserved_footer_len() {
319 return Err(HummockError::decode_error("footer too short"));
320 }
321 let mut back_cursor_end = buf.len();
323 let mut back_cursor_start = back_cursor_end - size_of::<u32>();
324 {
325 let magic =
326 u32::from_le_bytes(buf[back_cursor_start..back_cursor_end].try_into().unwrap());
327 if magic != VECTOR_FILE_MAGIC_NUM {
328 return Err(HummockError::magic_mismatch(VECTOR_FILE_MAGIC_NUM, magic));
329 }
330 }
331 back_cursor_end = back_cursor_start;
333 back_cursor_start = back_cursor_end - size_of::<u32>();
334 {
335 let file_version =
336 u32::from_le_bytes(buf[back_cursor_start..back_cursor_end].try_into().unwrap());
337 if file_version != VECTOR_FILE_VERSION {
338 return Err(HummockError::invalid_format_version(file_version));
339 }
340 }
341
342 back_cursor_end = back_cursor_start;
344 back_cursor_start = back_cursor_end - size_of::<u64>();
345
346 back_cursor_end = back_cursor_start;
348 back_cursor_start = back_cursor_end - size_of::<u64>();
349 let payload = &buf[..back_cursor_start];
350 {
351 let checksum =
352 u64::from_le_bytes(buf[back_cursor_start..back_cursor_end].try_into().unwrap());
353 xxhash64_verify(payload, checksum)?;
354 }
355
356 Ok(Self::decode(payload))
357 }
358}
359
360pub type NewStreamingUploaderFn = Box<
361 dyn Fn() -> BoxFuture<'static, HummockResult<(HummockVectorFileId, ObjectStreamingUploader)>>
362 + Send
363 + Sync,
364>;
365
366pub struct VectorFileBuilder {
367 dimension: usize,
368 new_uploader: NewStreamingUploaderFn,
369 max_block_size: usize,
370
371 block_metas: Vec<VectorBlockMeta>,
373 blocks: Vec<VectorBlock>,
374 building_block: Option<(VectorBlockBuilder, usize)>,
375 next_vector_id: usize,
376 next_block_offset: usize,
377 uploader: Option<(HummockVectorFileId, ObjectStreamingUploader)>,
378}
379
380impl VectorFileBuilder {
381 pub fn new(
382 dimension: usize,
383 new_uploader: NewStreamingUploaderFn,
384 next_vector_id: usize,
385 max_block_size: usize,
386 ) -> Self {
387 Self {
388 dimension,
389 new_uploader,
390 max_block_size,
391 block_metas: Vec::new(),
392 blocks: Vec::new(),
393 building_block: None,
394 uploader: None,
395 next_vector_id,
396 next_block_offset: 0,
397 }
398 }
399
400 pub fn add(&mut self, vec: VectorRef<'_>, info: &[u8]) {
401 let (builder, _) = self
402 .building_block
403 .get_or_insert_with(|| (VectorBlockBuilder::new(self.dimension), self.next_vector_id));
404 builder.add(vec, info);
405 self.next_vector_id += 1;
406 }
407
408 pub async fn finish(
409 &mut self,
410 ) -> HummockResult<Option<(VectorFileInfo, Vec<VectorBlock>, VectorFileMeta)>> {
411 self.flush_inner().await?;
412 assert!(self.building_block.is_none(), "unfinished block builder");
413 let ret = if let Some((object_id, mut uploader)) = self.uploader.take() {
414 assert!(!self.block_metas.is_empty());
415 let start_vector_id = self.block_metas.get(0).expect("non-empty").start_vector_id;
416 let meta = VectorFileMeta {
417 block_metas: take(&mut self.block_metas),
418 };
419 let meta_offset = self.next_block_offset;
420 let encoded_footer = meta.encode_footer(meta_offset);
421 let file_size = meta_offset + encoded_footer.len();
422 uploader.write_bytes(encoded_footer).await?;
423 uploader.finish().await?;
424 let file_info = VectorFileInfo {
425 object_id,
426 file_size: file_size.try_into().unwrap(),
427 start_vector_id,
428 vector_count: self.blocks.iter().map(|b| b.count()).sum::<usize>(),
429 meta_offset,
430 };
431 self.next_block_offset = 0;
432 Ok(Some((file_info, take(&mut self.blocks), meta)))
433 } else {
434 Ok(None)
435 };
436 assert!(self.is_empty(), "builder not empty after finish");
437 assert_eq!(
438 self.next_block_offset, 0,
439 "next_block_offset should be zero after finish"
440 );
441 ret
442 }
443
444 pub fn is_empty(&self) -> bool {
445 self.building_block.is_none() && self.blocks.is_empty() && self.block_metas.is_empty()
446 }
447
448 pub fn next_vector_id(&self) -> usize {
449 self.next_vector_id
450 }
451
452 pub async fn try_flush(&mut self) -> HummockResult<()> {
453 if let Some((builder, _)) = &self.building_block
454 && builder.encoded_len() >= self.max_block_size
455 {
456 self.flush_inner().await?;
457 }
458 Ok(())
459 }
460
461 async fn flush_inner(&mut self) -> HummockResult<()> {
462 if let Some((builder, start_vector_id)) = self.building_block.take() {
463 let encoded_block_len = builder.encoded_len();
464 if let Some(block) = builder.finish() {
465 let (_, uploader) = match &mut self.uploader {
466 Some(uploader) => uploader,
467 None => self.uploader.insert((self.new_uploader)().await?),
468 };
469 let encoded_block = block.encode(encoded_block_len);
470 let block_meta = VectorBlockMeta {
471 offset: self.next_block_offset,
472 block_size: encoded_block.len(),
473 vector_count: block.count(),
474 start_vector_id,
475 };
476 self.next_block_offset += encoded_block.len();
477 uploader.write_bytes(encoded_block).await?;
478 self.block_metas.push(block_meta);
479 self.blocks.push(block);
480 }
481 }
482 Ok(())
483 }
484}
485
486#[cfg(test)]
487mod tests {
488 use itertools::Itertools;
489 use risingwave_common::util::iter_util::ZipEqDebug;
490
491 use crate::hummock::vector::file::{VectorBlock, VectorBlockBuilder};
492 use crate::vector::test_utils::{gen_info, gen_vector};
493
494 const VECTOR_DIM: usize = 128;
495
496 #[test]
497 fn test_basic() {
498 let input = (0..200)
499 .map(|i| (gen_vector(VECTOR_DIM), gen_info(i)))
500 .collect_vec();
501 let mut builder = VectorBlockBuilder::new(VECTOR_DIM);
502 for (vec, info) in &input {
503 builder.add(vec.to_ref(), info);
504 }
505 let expected_encoded_len = builder.encoded_len();
506 let block = builder.finish().unwrap();
507 let mut encoded_block = Vec::new();
508 block.encode_payload(&mut encoded_block);
509 assert_eq!(expected_encoded_len, encoded_block.len());
510 let decoded_block = VectorBlock::decode_payload(encoded_block.as_slice());
511 assert_eq!(block, decoded_block);
512 for ((expected_vec, expected_info), (actual_vec, actual_info)) in
513 input.iter().zip_eq_debug(&block)
514 {
515 assert_eq!(expected_vec.to_ref(), actual_vec);
516 assert_eq!(expected_info.iter().as_slice(), actual_info);
517 }
518 }
519
520 #[test]
521 fn test_empty_builder() {
522 let builder = VectorBlockBuilder::new(VECTOR_DIM);
523 assert!(builder.finish().is_none());
524 }
525}