1use std::mem::take;
16use std::sync::Arc;
17
18use bytes::{Buf, BufMut, Bytes, BytesMut};
19use futures::future::BoxFuture;
20use risingwave_common::array::{VectorItemType, VectorRef};
21use risingwave_common::vector::{decode_vector_payload, encode_vector_payload};
22use risingwave_hummock_sdk::HummockVectorFileId;
23use risingwave_hummock_sdk::vector_index::{HnswFlatIndex, VectorFileInfo};
24use risingwave_object_store::object::ObjectStreamingUploader;
25
26use crate::hummock::vector::writer::{VectorObjectIdManagerRef, new_vector_file_builder};
27use crate::hummock::vector::{EnumVectorAccessor, get_vector_block, search_vector};
28use crate::hummock::{
29 HummockError, HummockResult, SstableStoreRef, xxhash64_checksum, xxhash64_verify,
30};
31use crate::opts::StorageOpts;
32use crate::vector::hnsw::VectorStore;
33
34const VECTOR_FILE_VERSION: u32 = 1;
35const VECTOR_FILE_MAGIC_NUM: u32 = 0x3866cd92;
36
37#[cfg_attr(test, derive(PartialEq, Debug))]
38pub struct VectorBlockInner {
39 dimension: usize,
40 vector_payload: Vec<VectorItemType>,
41 info_payload: Vec<u8>,
42 info_offset: Vec<u32>,
43}
44
45impl VectorBlockInner {
46 pub fn count(&self) -> usize {
47 self.info_offset.len()
48 }
49
50 pub fn vec_ref(&self, idx: usize) -> VectorRef<'_> {
51 let start = idx * self.dimension;
52 let end = start + self.dimension;
53 VectorRef::from_slice_unchecked(&self.vector_payload[start..end])
54 }
55
56 pub fn info(&self, idx: usize) -> &[u8] {
57 let start = if idx == 0 {
58 0
59 } else {
60 self.info_offset[idx - 1] as usize
61 };
62 let end = self.info_offset[idx] as usize;
63 &self.info_payload[start..end]
64 }
65
66 pub fn encoded_len(&self) -> usize {
67 size_of::<u32>() + size_of::<u32>() + self.info_offset.len() * size_of::<u32>() + self.info_payload.len() + self.vector_payload.len() * size_of::<f32>() }
73}
74
75pub struct VectorBlockBuilder {
76 inner: VectorBlockInner,
77 encoded_len: usize,
78}
79
80impl VectorBlockBuilder {
81 pub fn new(dimension: usize) -> Self {
82 Self {
83 inner: VectorBlockInner {
84 dimension,
85 vector_payload: vec![],
86 info_payload: vec![],
87 info_offset: vec![],
88 },
89 encoded_len: size_of::<u32>() + size_of::<u32>(), }
92 }
93
94 pub fn vec_ref(&self, idx: usize) -> VectorRef<'_> {
95 self.inner.vec_ref(idx)
96 }
97
98 pub fn info(&self, idx: usize) -> &[u8] {
99 self.inner.info(idx)
100 }
101
102 pub fn add(&mut self, vec: VectorRef<'_>, info: &[u8]) {
103 let slice = vec.as_slice();
104 assert_eq!(self.inner.dimension, slice.len());
105 self.inner.vector_payload.extend_from_slice(slice);
106 self.inner.info_payload.extend_from_slice(info);
107 let offset = self.inner.info_payload.len();
108 self.inner.info_offset.push(
109 offset
110 .try_into()
111 .unwrap_or_else(|_| panic!("offset {} overflow", offset)),
112 );
113 self.encoded_len += size_of::<u32>() + size_of_val(slice) + info.len()
114 }
115
116 pub fn encoded_len(&self) -> usize {
117 debug_assert_eq!(self.encoded_len, self.inner.encoded_len());
118 self.encoded_len
119 }
120
121 pub fn finish(self) -> Option<VectorBlock> {
122 if !self.inner.info_offset.is_empty() {
123 Some(VectorBlock(Arc::new(self.inner)))
124 } else {
125 None
126 }
127 }
128}
129
130#[cfg_attr(test, derive(PartialEq, Debug))]
131#[derive(Clone)]
132pub struct VectorBlock(Arc<VectorBlockInner>);
133
134impl VectorBlock {
135 pub fn count(&self) -> usize {
136 self.0.count()
137 }
138
139 pub fn vec_ref(&self, idx: usize) -> VectorRef<'_> {
140 self.0.vec_ref(idx)
141 }
142
143 pub fn info(&self, idx: usize) -> &[u8] {
144 self.0.info(idx)
145 }
146
147 fn encode_payload(&self, mut buf: impl BufMut) {
156 buf.put_u32_le(self.0.dimension.try_into().unwrap());
157 let vector_count = self.0.info_offset.len();
158 assert!(vector_count > 0);
159 buf.put_u32_le(
160 vector_count
161 .try_into()
162 .unwrap_or_else(|_| panic!("vector count {} overflow", vector_count)),
163 );
164 for offset in &self.0.info_offset {
165 buf.put_u32_le(*offset);
166 }
167 let last_offset = *self.0.info_offset.last().unwrap();
168 assert_eq!(last_offset as usize, self.0.info_payload.len());
169 buf.put_slice(&self.0.info_payload);
170 assert_eq!(self.0.vector_payload.len(), self.0.dimension * vector_count);
171 encode_vector_payload(&self.0.vector_payload, &mut buf);
172 }
173
174 fn decode_payload(mut buf: impl Buf) -> Self {
175 let dimension: usize = buf.get_u32_le().try_into().unwrap();
176 let vector_count = buf.get_u32_le() as usize;
177 let mut info_offset = Vec::with_capacity(vector_count);
178 for _ in 0..vector_count {
179 let offset = buf.get_u32_le();
180 info_offset.push(offset);
181 }
182 let info_payload_len = *info_offset.last().expect("non-empty") as usize;
183 let mut info_payload = vec![0; info_payload_len];
184 buf.copy_to_slice(&mut info_payload);
185 let vector_item_count = dimension * vector_count;
186 let vector_payload = decode_vector_payload(vector_item_count, buf);
187
188 Self(Arc::new(VectorBlockInner {
189 dimension,
190 vector_payload,
191 info_payload,
192 info_offset,
193 }))
194 }
195
196 fn encode(&self, encoded_block_len: usize) -> Bytes {
197 let encoded_len = encoded_block_len + size_of::<u64>(); let mut encoded_block = BytesMut::with_capacity(encoded_len);
199 self.encode_payload(&mut encoded_block);
200 let checksum = xxhash64_checksum(encoded_block.as_ref());
201 encoded_block.put_u64_le(checksum);
202 let encoded_block = encoded_block.freeze();
203 debug_assert_eq!(encoded_block.len(), encoded_len);
204 encoded_block
205 }
206
207 pub fn decode(buf: &[u8]) -> HummockResult<Self> {
208 if buf.len() < size_of::<u64>() {
209 return Err(HummockError::decode_error("block too short"));
210 }
211 let back_cursor_end = buf.len();
212 let back_cursor_start = back_cursor_end - size_of::<u64>();
213 let payload = &buf[..back_cursor_start];
214 {
215 let checksum =
216 u64::from_le_bytes(buf[back_cursor_start..back_cursor_end].try_into().unwrap());
217 xxhash64_verify(payload, checksum)?;
218 }
219 Ok(Self::decode_payload(payload))
220 }
221}
222
223impl<'a> IntoIterator for &'a VectorBlock {
224 type Item = (VectorRef<'a>, &'a [u8]);
225
226 type IntoIter = impl Iterator<Item = Self::Item>;
227
228 fn into_iter(self) -> Self::IntoIter {
229 (0..self.0.info_offset.len()).map(|i| (self.vec_ref(i), self.info(i)))
230 }
231}
232
233#[derive(Debug)]
234pub struct VectorBlockMeta {
235 pub offset: usize,
236 pub block_size: usize,
237 pub vector_count: usize,
238 pub start_vector_id: usize,
239}
240
241impl VectorBlockMeta {
242 fn encode(&self, mut buf: impl BufMut) {
243 buf.put_u64_le(self.offset.try_into().unwrap());
244 buf.put_u64_le(self.block_size.try_into().unwrap());
245 buf.put_u64_le(self.vector_count.try_into().unwrap());
246 buf.put_u64_le(self.start_vector_id.try_into().unwrap());
247 }
248
249 fn encoded_len() -> usize {
250 size_of::<u64>() * 4 }
252
253 fn decode(mut buf: impl Buf) -> Self {
254 let offset = buf.get_u64_le().try_into().unwrap();
255 let block_size = buf.get_u64_le().try_into().unwrap();
256 let vector_count = buf.get_u64_le().try_into().unwrap();
257 let start_vector_id = buf.get_u64_le().try_into().unwrap();
258 Self {
259 offset,
260 block_size,
261 vector_count,
262 start_vector_id,
263 }
264 }
265}
266
267pub struct VectorFileMeta {
268 pub block_metas: Vec<VectorBlockMeta>,
269}
270
271impl VectorFileMeta {
272 fn encode(&self, mut buf: impl BufMut) {
273 buf.put_u32_le(self.block_metas.len().try_into().unwrap());
274 for meta in &self.block_metas {
275 meta.encode(&mut buf);
276 }
277 }
278
279 fn encode_len(&self) -> usize {
280 size_of::<u32>() + self.block_metas.len() * VectorBlockMeta::encoded_len()
282 }
283
284 fn decode(mut buf: impl Buf) -> Self {
285 let block_count = buf.get_u32_le() as usize;
286 let mut block_metas = Vec::with_capacity(block_count);
287 for _ in 0..block_count {
288 block_metas.push(VectorBlockMeta::decode(&mut buf));
289 }
290 Self { block_metas }
291 }
292
293 fn preserved_footer_len() -> usize {
294 size_of::<u64>() + size_of::<u64>() + size_of::<u32>() + size_of::<u32>() }
299
300 fn encode_footer(&self, mete_offset: usize) -> Bytes {
301 let encoded_footer_len = self.encode_len() + Self::preserved_footer_len();
302 let mut encoded_footer = BytesMut::with_capacity(encoded_footer_len);
303 self.encode(&mut encoded_footer);
304 let checksum = xxhash64_checksum(encoded_footer.as_ref());
305 encoded_footer.put_u64_le(checksum);
306 encoded_footer.put_u64_le(mete_offset.try_into().unwrap());
307 encoded_footer.put_u32_le(VECTOR_FILE_VERSION);
308 encoded_footer.put_u32_le(VECTOR_FILE_MAGIC_NUM);
309 let encoded_footer = encoded_footer.freeze();
310 debug_assert_eq!(encoded_footer.len(), encoded_footer_len);
311 encoded_footer
312 }
313
314 pub fn decode_footer(buf: &[u8]) -> HummockResult<Self> {
315 if buf.len() < Self::preserved_footer_len() {
316 return Err(HummockError::decode_error("footer too short"));
317 }
318 let mut back_cursor_end = buf.len();
320 let mut back_cursor_start = back_cursor_end - size_of::<u32>();
321 {
322 let magic =
323 u32::from_le_bytes(buf[back_cursor_start..back_cursor_end].try_into().unwrap());
324 if magic != VECTOR_FILE_MAGIC_NUM {
325 return Err(HummockError::magic_mismatch(VECTOR_FILE_MAGIC_NUM, magic));
326 }
327 }
328 back_cursor_end = back_cursor_start;
330 back_cursor_start = back_cursor_end - size_of::<u32>();
331 {
332 let file_version =
333 u32::from_le_bytes(buf[back_cursor_start..back_cursor_end].try_into().unwrap());
334 if file_version != VECTOR_FILE_VERSION {
335 return Err(HummockError::invalid_format_version(file_version));
336 }
337 }
338
339 back_cursor_end = back_cursor_start;
341 back_cursor_start = back_cursor_end - size_of::<u64>();
342
343 back_cursor_end = back_cursor_start;
345 back_cursor_start = back_cursor_end - size_of::<u64>();
346 let payload = &buf[..back_cursor_start];
347 {
348 let checksum =
349 u64::from_le_bytes(buf[back_cursor_start..back_cursor_end].try_into().unwrap());
350 xxhash64_verify(payload, checksum)?;
351 }
352
353 Ok(Self::decode(payload))
354 }
355}
356
357pub type NewStreamingUploaderFn = Box<
358 dyn Fn() -> BoxFuture<'static, HummockResult<(HummockVectorFileId, ObjectStreamingUploader)>>
359 + Send
360 + Sync,
361>;
362
363pub struct VectorFileBuilder {
364 dimension: usize,
365 new_uploader: NewStreamingUploaderFn,
366 max_block_size: usize,
367
368 block_metas: Vec<VectorBlockMeta>,
370 blocks: Vec<VectorBlock>,
371 building_block: Option<(VectorBlockBuilder, usize)>,
372 next_vector_id: usize,
373 next_block_offset: usize,
374 uploader: Option<(HummockVectorFileId, ObjectStreamingUploader)>,
375}
376
377impl VectorFileBuilder {
378 pub fn new(
379 dimension: usize,
380 new_uploader: NewStreamingUploaderFn,
381 next_vector_id: usize,
382 max_block_size: usize,
383 ) -> Self {
384 Self {
385 dimension,
386 new_uploader,
387 max_block_size,
388 block_metas: Vec::new(),
389 blocks: Vec::new(),
390 building_block: None,
391 uploader: None,
392 next_vector_id,
393 next_block_offset: 0,
394 }
395 }
396
397 pub fn get_vector(&self, idx: usize) -> EnumVectorAccessor<'_> {
398 if let Some((builder, start_vector_id)) = &self.building_block
399 && idx >= *start_vector_id
400 {
401 EnumVectorAccessor::Builder(builder, idx - start_vector_id)
402 } else {
403 let (block_idx, offset) =
404 search_vector(&self.block_metas, idx, |meta| meta.start_vector_id);
405 EnumVectorAccessor::BlockRef(&self.blocks[block_idx], offset)
406 }
407 }
408
409 pub fn add(&mut self, vec: VectorRef<'_>, info: &[u8]) {
410 let (builder, _) = self
411 .building_block
412 .get_or_insert_with(|| (VectorBlockBuilder::new(self.dimension), self.next_vector_id));
413 builder.add(vec, info);
414 self.next_vector_id += 1;
415 }
416
417 pub async fn finish(
418 &mut self,
419 ) -> HummockResult<Option<(VectorFileInfo, Vec<VectorBlock>, VectorFileMeta)>> {
420 self.flush_inner().await?;
421 assert!(self.building_block.is_none(), "unfinished block builder");
422 let ret = if let Some((object_id, mut uploader)) = self.uploader.take() {
423 assert!(!self.block_metas.is_empty());
424 let start_vector_id = self.block_metas.get(0).expect("non-empty").start_vector_id;
425 let meta = VectorFileMeta {
426 block_metas: take(&mut self.block_metas),
427 };
428 let meta_offset = self.next_block_offset;
429 let encoded_footer = meta.encode_footer(meta_offset);
430 let file_size = meta_offset + encoded_footer.len();
431 uploader.write_bytes(encoded_footer).await?;
432 uploader.finish().await?;
433 let file_info = VectorFileInfo {
434 object_id,
435 file_size: file_size.try_into().unwrap(),
436 start_vector_id,
437 vector_count: self.blocks.iter().map(|b| b.count()).sum::<usize>(),
438 meta_offset,
439 };
440 self.next_block_offset = 0;
441 Ok(Some((file_info, take(&mut self.blocks), meta)))
442 } else {
443 Ok(None)
444 };
445 assert!(self.is_empty(), "builder not empty after finish");
446 assert_eq!(
447 self.next_block_offset, 0,
448 "next_block_offset should be zero after finish"
449 );
450 ret
451 }
452
453 pub fn is_empty(&self) -> bool {
454 self.building_block.is_none() && self.blocks.is_empty() && self.block_metas.is_empty()
455 }
456
457 pub fn next_vector_id(&self) -> usize {
458 self.next_vector_id
459 }
460
461 pub async fn try_flush(&mut self) -> HummockResult<()> {
462 if let Some((builder, _)) = &self.building_block
463 && builder.encoded_len() >= self.max_block_size
464 {
465 self.flush_inner().await?;
466 }
467 Ok(())
468 }
469
470 async fn flush_inner(&mut self) -> HummockResult<()> {
471 if let Some((builder, start_vector_id)) = self.building_block.take() {
472 let encoded_block_len = builder.encoded_len();
473 if let Some(block) = builder.finish() {
474 let (_, uploader) = match &mut self.uploader {
475 Some(uploader) => uploader,
476 None => self.uploader.insert((self.new_uploader)().await?),
477 };
478 let encoded_block = block.encode(encoded_block_len);
479 let block_meta = VectorBlockMeta {
480 offset: self.next_block_offset,
481 block_size: encoded_block.len(),
482 vector_count: block.count(),
483 start_vector_id,
484 };
485 self.next_block_offset += encoded_block.len();
486 uploader.write_bytes(encoded_block).await?;
487 self.block_metas.push(block_meta);
488 self.blocks.push(block);
489 }
490 }
491 Ok(())
492 }
493}
494
495pub(super) struct BuildingVectors {
496 flushed_next_vector_id: usize,
497 pub(super) file_builder: VectorFileBuilder,
498}
499
500pub(crate) struct FileVectorStore {
501 sstable_store: SstableStoreRef,
502
503 flushed_vector_files: Vec<VectorFileInfo>,
504 pub(super) building_vectors: Option<BuildingVectors>,
505}
506
507impl FileVectorStore {
508 pub(crate) fn new_for_reader(index: &HnswFlatIndex, sstable_store: SstableStoreRef) -> Self {
509 Self {
510 sstable_store,
511 flushed_vector_files: index.vector_store_info.vector_files.clone(),
512 building_vectors: None,
513 }
514 }
515
516 pub(crate) fn new_for_writer(
517 index: &HnswFlatIndex,
518 dimension: usize,
519 sstable_store: SstableStoreRef,
520 object_id_manager: VectorObjectIdManagerRef,
521 storage_opts: &StorageOpts,
522 ) -> Self {
523 let next_vector_id = index.vector_store_info.next_vector_id;
524 Self {
525 sstable_store: sstable_store.clone(),
526 flushed_vector_files: index.vector_store_info.vector_files.clone(),
527 building_vectors: Some(BuildingVectors {
528 flushed_next_vector_id: next_vector_id,
529 file_builder: new_vector_file_builder(
530 dimension,
531 next_vector_id,
532 sstable_store,
533 object_id_manager,
534 storage_opts,
535 ),
536 }),
537 }
538 }
539
540 pub(super) async fn flush(&mut self) -> HummockResult<Option<VectorFileInfo>> {
541 let building_vectors = self.building_vectors.as_mut().expect("for write");
542 if let Some((vector_file, blocks, meta)) = building_vectors.file_builder.finish().await? {
543 self.sstable_store
544 .insert_vector_cache(vector_file.object_id, meta, blocks);
545 self.flushed_vector_files.push(vector_file.clone());
546 building_vectors.flushed_next_vector_id =
547 building_vectors.file_builder.next_vector_id();
548 Ok(Some(vector_file))
549 } else {
550 Ok(None)
551 }
552 }
553}
554
555impl VectorStore for FileVectorStore {
556 type Accessor<'a>
557 = EnumVectorAccessor<'a>
558 where
559 Self: 'a;
560
561 async fn get_vector(&self, idx: usize) -> HummockResult<Self::Accessor<'_>> {
562 if let Some(building_vectors) = self.building_vectors.as_ref()
563 && idx >= building_vectors.flushed_next_vector_id
564 {
565 Ok(building_vectors.file_builder.get_vector(idx))
566 } else {
567 Ok(EnumVectorAccessor::BlockHolder(
568 get_vector_block(&self.sstable_store, &self.flushed_vector_files, idx).await?,
569 ))
570 }
571 }
572}
573
574#[cfg(test)]
575mod tests {
576 use itertools::Itertools;
577 use risingwave_common::util::iter_util::ZipEqDebug;
578
579 use crate::hummock::vector::file::{VectorBlock, VectorBlockBuilder};
580 use crate::vector::test_utils::{gen_info, gen_vector};
581
582 const VECTOR_DIM: usize = 128;
583
584 #[test]
585 fn test_basic() {
586 let input = (0..200)
587 .map(|i| (gen_vector(VECTOR_DIM), gen_info(i)))
588 .collect_vec();
589 let mut builder = VectorBlockBuilder::new(VECTOR_DIM);
590 for (vec, info) in &input {
591 builder.add(vec.to_ref(), info);
592 }
593 let expected_encoded_len = builder.encoded_len();
594 let block = builder.finish().unwrap();
595 let mut encoded_block = Vec::new();
596 block.encode_payload(&mut encoded_block);
597 assert_eq!(expected_encoded_len, encoded_block.len());
598 let decoded_block = VectorBlock::decode_payload(encoded_block.as_slice());
599 assert_eq!(block, decoded_block);
600 for ((expected_vec, expected_info), (actual_vec, actual_info)) in
601 input.iter().zip_eq_debug(&block)
602 {
603 assert_eq!(expected_vec.to_ref(), actual_vec);
604 assert_eq!(expected_info.iter().as_slice(), actual_info);
605 }
606 }
607
608 #[test]
609 fn test_empty_builder() {
610 let builder = VectorBlockBuilder::new(VECTOR_DIM);
611 assert!(builder.finish().is_none());
612 }
613}