1use std::mem::take;
16use std::sync::Arc;
17
18use bytes::{Buf, BufMut, Bytes, BytesMut};
19use futures::future::BoxFuture;
20use risingwave_common::array::{VectorItemType, VectorRef};
21use risingwave_common::vector::{decode_vector_payload, encode_vector_payload};
22use risingwave_hummock_sdk::HummockVectorFileId;
23use risingwave_hummock_sdk::vector_index::{HnswFlatIndex, VectorFileInfo};
24use risingwave_object_store::object::ObjectStreamingUploader;
25
26use crate::hummock::vector::monitor::VectorStoreCacheStats;
27use crate::hummock::vector::writer::{VectorObjectIdManagerRef, new_vector_file_builder};
28use crate::hummock::vector::{EnumVectorAccessor, get_vector_block, search_vector};
29use crate::hummock::{
30 HummockError, HummockResult, SstableStoreRef, xxhash64_checksum, xxhash64_verify,
31};
32use crate::opts::StorageOpts;
33use crate::vector::hnsw::VectorStore;
34
35const VECTOR_FILE_VERSION: u32 = 1;
36const VECTOR_FILE_MAGIC_NUM: u32 = 0x3866cd92;
37
38#[cfg_attr(test, derive(PartialEq, Debug))]
39pub struct VectorBlockInner {
40 dimension: usize,
41 vector_payload: Vec<VectorItemType>,
42 info_payload: Vec<u8>,
43 info_offset: Vec<u32>,
44}
45
46impl VectorBlockInner {
47 pub fn count(&self) -> usize {
48 self.info_offset.len()
49 }
50
51 pub fn vec_ref(&self, idx: usize) -> VectorRef<'_> {
52 let start = idx * self.dimension;
53 let end = start + self.dimension;
54 VectorRef::from_slice_unchecked(&self.vector_payload[start..end])
55 }
56
57 pub fn info(&self, idx: usize) -> &[u8] {
58 let start = if idx == 0 {
59 0
60 } else {
61 self.info_offset[idx - 1] as usize
62 };
63 let end = self.info_offset[idx] as usize;
64 &self.info_payload[start..end]
65 }
66
67 pub fn encoded_len(&self) -> usize {
68 size_of::<u32>() + size_of::<u32>() + self.info_offset.len() * size_of::<u32>() + self.info_payload.len() + self.vector_payload.len() * size_of::<f32>() }
74}
75
76pub struct VectorBlockBuilder {
77 inner: VectorBlockInner,
78 encoded_len: usize,
79}
80
81impl VectorBlockBuilder {
82 pub fn new(dimension: usize) -> Self {
83 Self {
84 inner: VectorBlockInner {
85 dimension,
86 vector_payload: vec![],
87 info_payload: vec![],
88 info_offset: vec![],
89 },
90 encoded_len: size_of::<u32>() + size_of::<u32>(), }
93 }
94
95 pub fn vec_ref(&self, idx: usize) -> VectorRef<'_> {
96 self.inner.vec_ref(idx)
97 }
98
99 pub fn info(&self, idx: usize) -> &[u8] {
100 self.inner.info(idx)
101 }
102
103 pub fn add(&mut self, vec: VectorRef<'_>, info: &[u8]) {
104 let slice = vec.as_slice();
105 assert_eq!(self.inner.dimension, slice.len());
106 self.inner.vector_payload.extend_from_slice(slice);
107 self.inner.info_payload.extend_from_slice(info);
108 let offset = self.inner.info_payload.len();
109 self.inner.info_offset.push(
110 offset
111 .try_into()
112 .unwrap_or_else(|_| panic!("offset {} overflow", offset)),
113 );
114 self.encoded_len += size_of::<u32>() + size_of_val(slice) + info.len()
115 }
116
117 pub fn encoded_len(&self) -> usize {
118 debug_assert_eq!(self.encoded_len, self.inner.encoded_len());
119 self.encoded_len
120 }
121
122 pub fn finish(self) -> Option<VectorBlock> {
123 if !self.inner.info_offset.is_empty() {
124 Some(VectorBlock(Arc::new(self.inner)))
125 } else {
126 None
127 }
128 }
129}
130
131#[cfg_attr(test, derive(PartialEq, Debug))]
132#[derive(Clone)]
133pub struct VectorBlock(Arc<VectorBlockInner>);
134
135impl VectorBlock {
136 pub fn count(&self) -> usize {
137 self.0.count()
138 }
139
140 pub fn vec_ref(&self, idx: usize) -> VectorRef<'_> {
141 self.0.vec_ref(idx)
142 }
143
144 pub fn info(&self, idx: usize) -> &[u8] {
145 self.0.info(idx)
146 }
147
148 fn encode_payload(&self, mut buf: impl BufMut) {
157 buf.put_u32_le(self.0.dimension.try_into().unwrap());
158 let vector_count = self.0.info_offset.len();
159 assert!(vector_count > 0);
160 buf.put_u32_le(
161 vector_count
162 .try_into()
163 .unwrap_or_else(|_| panic!("vector count {} overflow", vector_count)),
164 );
165 for offset in &self.0.info_offset {
166 buf.put_u32_le(*offset);
167 }
168 let last_offset = *self.0.info_offset.last().unwrap();
169 assert_eq!(last_offset as usize, self.0.info_payload.len());
170 buf.put_slice(&self.0.info_payload);
171 assert_eq!(self.0.vector_payload.len(), self.0.dimension * vector_count);
172 encode_vector_payload(&self.0.vector_payload, &mut buf);
173 }
174
175 fn decode_payload(mut buf: impl Buf) -> Self {
176 let dimension: usize = buf.get_u32_le().try_into().unwrap();
177 let vector_count = buf.get_u32_le() as usize;
178 let mut info_offset = Vec::with_capacity(vector_count);
179 for _ in 0..vector_count {
180 let offset = buf.get_u32_le();
181 info_offset.push(offset);
182 }
183 let info_payload_len = *info_offset.last().expect("non-empty") as usize;
184 let mut info_payload = vec![0; info_payload_len];
185 buf.copy_to_slice(&mut info_payload);
186 let vector_item_count = dimension * vector_count;
187 let vector_payload = decode_vector_payload(vector_item_count, buf);
188
189 Self(Arc::new(VectorBlockInner {
190 dimension,
191 vector_payload,
192 info_payload,
193 info_offset,
194 }))
195 }
196
197 fn encode(&self, encoded_block_len: usize) -> Bytes {
198 let encoded_len = encoded_block_len + size_of::<u64>(); let mut encoded_block = BytesMut::with_capacity(encoded_len);
200 self.encode_payload(&mut encoded_block);
201 let checksum = xxhash64_checksum(encoded_block.as_ref());
202 encoded_block.put_u64_le(checksum);
203 let encoded_block = encoded_block.freeze();
204 debug_assert_eq!(encoded_block.len(), encoded_len);
205 encoded_block
206 }
207
208 pub fn decode(buf: &[u8]) -> HummockResult<Self> {
209 if buf.len() < size_of::<u64>() {
210 return Err(HummockError::decode_error("block too short"));
211 }
212 let back_cursor_end = buf.len();
213 let back_cursor_start = back_cursor_end - size_of::<u64>();
214 let payload = &buf[..back_cursor_start];
215 {
216 let checksum =
217 u64::from_le_bytes(buf[back_cursor_start..back_cursor_end].try_into().unwrap());
218 xxhash64_verify(payload, checksum)?;
219 }
220 Ok(Self::decode_payload(payload))
221 }
222}
223
224impl<'a> IntoIterator for &'a VectorBlock {
225 type Item = (VectorRef<'a>, &'a [u8]);
226
227 type IntoIter = impl Iterator<Item = Self::Item>;
228
229 fn into_iter(self) -> Self::IntoIter {
230 (0..self.0.info_offset.len()).map(|i| (self.vec_ref(i), self.info(i)))
231 }
232}
233
234#[derive(Debug)]
235pub struct VectorBlockMeta {
236 pub offset: usize,
237 pub block_size: usize,
238 pub vector_count: usize,
239 pub start_vector_id: usize,
240}
241
242impl VectorBlockMeta {
243 fn encode(&self, mut buf: impl BufMut) {
244 buf.put_u64_le(self.offset.try_into().unwrap());
245 buf.put_u64_le(self.block_size.try_into().unwrap());
246 buf.put_u64_le(self.vector_count.try_into().unwrap());
247 buf.put_u64_le(self.start_vector_id.try_into().unwrap());
248 }
249
250 fn encoded_len() -> usize {
251 size_of::<u64>() * 4 }
253
254 fn decode(mut buf: impl Buf) -> Self {
255 let offset = buf.get_u64_le().try_into().unwrap();
256 let block_size = buf.get_u64_le().try_into().unwrap();
257 let vector_count = buf.get_u64_le().try_into().unwrap();
258 let start_vector_id = buf.get_u64_le().try_into().unwrap();
259 Self {
260 offset,
261 block_size,
262 vector_count,
263 start_vector_id,
264 }
265 }
266}
267
268pub struct VectorFileMeta {
269 pub block_metas: Vec<VectorBlockMeta>,
270}
271
272impl VectorFileMeta {
273 fn encode(&self, mut buf: impl BufMut) {
274 buf.put_u32_le(self.block_metas.len().try_into().unwrap());
275 for meta in &self.block_metas {
276 meta.encode(&mut buf);
277 }
278 }
279
280 fn encode_len(&self) -> usize {
281 size_of::<u32>() + self.block_metas.len() * VectorBlockMeta::encoded_len()
283 }
284
285 fn decode(mut buf: impl Buf) -> Self {
286 let block_count = buf.get_u32_le() as usize;
287 let mut block_metas = Vec::with_capacity(block_count);
288 for _ in 0..block_count {
289 block_metas.push(VectorBlockMeta::decode(&mut buf));
290 }
291 Self { block_metas }
292 }
293
294 fn preserved_footer_len() -> usize {
295 size_of::<u64>() + size_of::<u64>() + size_of::<u32>() + size_of::<u32>() }
300
301 fn encode_footer(&self, mete_offset: usize) -> Bytes {
302 let encoded_footer_len = self.encode_len() + Self::preserved_footer_len();
303 let mut encoded_footer = BytesMut::with_capacity(encoded_footer_len);
304 self.encode(&mut encoded_footer);
305 let checksum = xxhash64_checksum(encoded_footer.as_ref());
306 encoded_footer.put_u64_le(checksum);
307 encoded_footer.put_u64_le(mete_offset.try_into().unwrap());
308 encoded_footer.put_u32_le(VECTOR_FILE_VERSION);
309 encoded_footer.put_u32_le(VECTOR_FILE_MAGIC_NUM);
310 let encoded_footer = encoded_footer.freeze();
311 debug_assert_eq!(encoded_footer.len(), encoded_footer_len);
312 encoded_footer
313 }
314
315 pub fn decode_footer(buf: &[u8]) -> HummockResult<Self> {
316 if buf.len() < Self::preserved_footer_len() {
317 return Err(HummockError::decode_error("footer too short"));
318 }
319 let mut back_cursor_end = buf.len();
321 let mut back_cursor_start = back_cursor_end - size_of::<u32>();
322 {
323 let magic =
324 u32::from_le_bytes(buf[back_cursor_start..back_cursor_end].try_into().unwrap());
325 if magic != VECTOR_FILE_MAGIC_NUM {
326 return Err(HummockError::magic_mismatch(VECTOR_FILE_MAGIC_NUM, magic));
327 }
328 }
329 back_cursor_end = back_cursor_start;
331 back_cursor_start = back_cursor_end - size_of::<u32>();
332 {
333 let file_version =
334 u32::from_le_bytes(buf[back_cursor_start..back_cursor_end].try_into().unwrap());
335 if file_version != VECTOR_FILE_VERSION {
336 return Err(HummockError::invalid_format_version(file_version));
337 }
338 }
339
340 back_cursor_end = back_cursor_start;
342 back_cursor_start = back_cursor_end - size_of::<u64>();
343
344 back_cursor_end = back_cursor_start;
346 back_cursor_start = back_cursor_end - size_of::<u64>();
347 let payload = &buf[..back_cursor_start];
348 {
349 let checksum =
350 u64::from_le_bytes(buf[back_cursor_start..back_cursor_end].try_into().unwrap());
351 xxhash64_verify(payload, checksum)?;
352 }
353
354 Ok(Self::decode(payload))
355 }
356}
357
358pub type NewStreamingUploaderFn = Box<
359 dyn Fn() -> BoxFuture<'static, HummockResult<(HummockVectorFileId, ObjectStreamingUploader)>>
360 + Send
361 + Sync,
362>;
363
364pub struct VectorFileBuilder {
365 dimension: usize,
366 new_uploader: NewStreamingUploaderFn,
367 max_block_size: usize,
368
369 block_metas: Vec<VectorBlockMeta>,
371 blocks: Vec<VectorBlock>,
372 building_block: Option<(VectorBlockBuilder, usize)>,
373 next_vector_id: usize,
374 next_block_offset: usize,
375 uploader: Option<(HummockVectorFileId, ObjectStreamingUploader)>,
376}
377
378impl VectorFileBuilder {
379 pub fn new(
380 dimension: usize,
381 new_uploader: NewStreamingUploaderFn,
382 next_vector_id: usize,
383 max_block_size: usize,
384 ) -> Self {
385 Self {
386 dimension,
387 new_uploader,
388 max_block_size,
389 block_metas: Vec::new(),
390 blocks: Vec::new(),
391 building_block: None,
392 uploader: None,
393 next_vector_id,
394 next_block_offset: 0,
395 }
396 }
397
398 pub fn get_vector(&self, idx: usize) -> EnumVectorAccessor<'_> {
399 if let Some((builder, start_vector_id)) = &self.building_block
400 && idx >= *start_vector_id
401 {
402 EnumVectorAccessor::Builder(builder, idx - start_vector_id)
403 } else {
404 let (block_idx, offset) =
405 search_vector(&self.block_metas, idx, |meta| meta.start_vector_id);
406 EnumVectorAccessor::BlockRef(&self.blocks[block_idx], offset)
407 }
408 }
409
410 pub fn add(&mut self, vec: VectorRef<'_>, info: &[u8]) {
411 let (builder, _) = self
412 .building_block
413 .get_or_insert_with(|| (VectorBlockBuilder::new(self.dimension), self.next_vector_id));
414 builder.add(vec, info);
415 self.next_vector_id += 1;
416 }
417
418 pub async fn finish(
419 &mut self,
420 ) -> HummockResult<Option<(VectorFileInfo, Vec<VectorBlock>, VectorFileMeta)>> {
421 self.flush_inner().await?;
422 assert!(self.building_block.is_none(), "unfinished block builder");
423 let ret = if let Some((object_id, mut uploader)) = self.uploader.take() {
424 assert!(!self.block_metas.is_empty());
425 let start_vector_id = self.block_metas.get(0).expect("non-empty").start_vector_id;
426 let meta = VectorFileMeta {
427 block_metas: take(&mut self.block_metas),
428 };
429 let meta_offset = self.next_block_offset;
430 let encoded_footer = meta.encode_footer(meta_offset);
431 let file_size = meta_offset + encoded_footer.len();
432 uploader.write_bytes(encoded_footer).await?;
433 uploader.finish().await?;
434 let file_info = VectorFileInfo {
435 object_id,
436 file_size: file_size.try_into().unwrap(),
437 start_vector_id,
438 vector_count: self.blocks.iter().map(|b| b.count()).sum::<usize>(),
439 meta_offset,
440 };
441 self.next_block_offset = 0;
442 Ok(Some((file_info, take(&mut self.blocks), meta)))
443 } else {
444 Ok(None)
445 };
446 assert!(self.is_empty(), "builder not empty after finish");
447 assert_eq!(
448 self.next_block_offset, 0,
449 "next_block_offset should be zero after finish"
450 );
451 ret
452 }
453
454 pub fn is_empty(&self) -> bool {
455 self.building_block.is_none() && self.blocks.is_empty() && self.block_metas.is_empty()
456 }
457
458 pub fn next_vector_id(&self) -> usize {
459 self.next_vector_id
460 }
461
462 pub async fn try_flush(&mut self) -> HummockResult<()> {
463 if let Some((builder, _)) = &self.building_block
464 && builder.encoded_len() >= self.max_block_size
465 {
466 self.flush_inner().await?;
467 }
468 Ok(())
469 }
470
471 async fn flush_inner(&mut self) -> HummockResult<()> {
472 if let Some((builder, start_vector_id)) = self.building_block.take() {
473 let encoded_block_len = builder.encoded_len();
474 if let Some(block) = builder.finish() {
475 let (_, uploader) = match &mut self.uploader {
476 Some(uploader) => uploader,
477 None => self.uploader.insert((self.new_uploader)().await?),
478 };
479 let encoded_block = block.encode(encoded_block_len);
480 let block_meta = VectorBlockMeta {
481 offset: self.next_block_offset,
482 block_size: encoded_block.len(),
483 vector_count: block.count(),
484 start_vector_id,
485 };
486 self.next_block_offset += encoded_block.len();
487 uploader.write_bytes(encoded_block).await?;
488 self.block_metas.push(block_meta);
489 self.blocks.push(block);
490 }
491 }
492 Ok(())
493 }
494}
495
496pub(super) struct BuildingVectors {
497 flushed_next_vector_id: usize,
498 pub(super) file_builder: VectorFileBuilder,
499}
500
501#[derive(Default)]
502pub(crate) struct FileVectorStoreCtx {
503 pub stats: VectorStoreCacheStats,
504}
505
506pub(crate) struct FileVectorStore {
507 sstable_store: SstableStoreRef,
508
509 flushed_vector_files: Vec<VectorFileInfo>,
510 pub(super) building_vectors: Option<BuildingVectors>,
511}
512
513impl FileVectorStore {
514 pub(crate) fn new_for_reader(index: &HnswFlatIndex, sstable_store: SstableStoreRef) -> Self {
515 Self {
516 sstable_store,
517 flushed_vector_files: index.vector_store_info.vector_files.clone(),
518 building_vectors: None,
519 }
520 }
521
522 pub(crate) fn new_for_writer(
523 index: &HnswFlatIndex,
524 dimension: usize,
525 sstable_store: SstableStoreRef,
526 object_id_manager: VectorObjectIdManagerRef,
527 storage_opts: &StorageOpts,
528 ) -> Self {
529 let next_vector_id = index.vector_store_info.next_vector_id;
530 Self {
531 sstable_store: sstable_store.clone(),
532 flushed_vector_files: index.vector_store_info.vector_files.clone(),
533 building_vectors: Some(BuildingVectors {
534 flushed_next_vector_id: next_vector_id,
535 file_builder: new_vector_file_builder(
536 dimension,
537 next_vector_id,
538 sstable_store,
539 object_id_manager,
540 storage_opts,
541 ),
542 }),
543 }
544 }
545
546 pub(super) async fn flush(&mut self) -> HummockResult<Option<VectorFileInfo>> {
547 let building_vectors = self.building_vectors.as_mut().expect("for write");
548 if let Some((vector_file, blocks, meta)) = building_vectors.file_builder.finish().await? {
549 self.sstable_store
550 .insert_vector_cache(vector_file.object_id, meta, blocks);
551 self.flushed_vector_files.push(vector_file.clone());
552 building_vectors.flushed_next_vector_id =
553 building_vectors.file_builder.next_vector_id();
554 Ok(Some(vector_file))
555 } else {
556 Ok(None)
557 }
558 }
559
560 pub fn flushed_vector_files(&self) -> &[VectorFileInfo] {
561 &self.flushed_vector_files
562 }
563}
564
565impl VectorStore for FileVectorStore {
566 type Accessor<'a>
567 = EnumVectorAccessor<'a>
568 where
569 Self: 'a;
570 type Ctx = FileVectorStoreCtx;
571 type Err = HummockError;
572
573 async fn get_vector(
574 &self,
575 idx: usize,
576 ctx: &mut Self::Ctx,
577 ) -> HummockResult<Self::Accessor<'_>> {
578 if let Some(building_vectors) = self.building_vectors.as_ref()
579 && idx >= building_vectors.flushed_next_vector_id
580 {
581 Ok(building_vectors.file_builder.get_vector(idx))
582 } else {
583 Ok(EnumVectorAccessor::BlockHolder(
584 get_vector_block(
585 &self.sstable_store,
586 &self.flushed_vector_files,
587 idx,
588 &mut ctx.stats,
589 )
590 .await?,
591 ))
592 }
593 }
594}
595
596#[cfg(test)]
597mod tests {
598 use itertools::Itertools;
599 use risingwave_common::util::iter_util::ZipEqDebug;
600
601 use crate::hummock::vector::file::{VectorBlock, VectorBlockBuilder};
602 use crate::vector::test_utils::{gen_info, gen_vector};
603
604 const VECTOR_DIM: usize = 128;
605
606 #[test]
607 fn test_basic() {
608 let input = (0..200)
609 .map(|i| (gen_vector(VECTOR_DIM), gen_info(i)))
610 .collect_vec();
611 let mut builder = VectorBlockBuilder::new(VECTOR_DIM);
612 for (vec, info) in &input {
613 builder.add(vec.to_ref(), info);
614 }
615 let expected_encoded_len = builder.encoded_len();
616 let block = builder.finish().unwrap();
617 let mut encoded_block = Vec::new();
618 block.encode_payload(&mut encoded_block);
619 assert_eq!(expected_encoded_len, encoded_block.len());
620 let decoded_block = VectorBlock::decode_payload(encoded_block.as_slice());
621 assert_eq!(block, decoded_block);
622 for ((expected_vec, expected_info), (actual_vec, actual_info)) in
623 input.iter().zip_eq_debug(&block)
624 {
625 assert_eq!(expected_vec.to_ref(), actual_vec);
626 assert_eq!(expected_info.iter().as_slice(), actual_info);
627 }
628 }
629
630 #[test]
631 fn test_empty_builder() {
632 let builder = VectorBlockBuilder::new(VECTOR_DIM);
633 assert!(builder.finish().is_none());
634 }
635}