risingwave_storage/hummock/vector/
file.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::mem::take;
16use std::sync::Arc;
17
18use bytes::{Buf, BufMut, Bytes, BytesMut};
19use futures::future::BoxFuture;
20use risingwave_common::array::{VectorItemType, VectorRef};
21use risingwave_common::vector::{decode_vector_payload, encode_vector_payload};
22use risingwave_hummock_sdk::HummockVectorFileId;
23use risingwave_hummock_sdk::vector_index::{HnswFlatIndex, VectorFileInfo};
24use risingwave_object_store::object::ObjectStreamingUploader;
25
26use crate::hummock::vector::monitor::VectorStoreCacheStats;
27use crate::hummock::vector::writer::{VectorObjectIdManagerRef, new_vector_file_builder};
28use crate::hummock::vector::{EnumVectorAccessor, get_vector_block, search_vector};
29use crate::hummock::{
30    HummockError, HummockResult, SstableStoreRef, xxhash64_checksum, xxhash64_verify,
31};
32use crate::opts::StorageOpts;
33use crate::vector::hnsw::VectorStore;
34
35const VECTOR_FILE_VERSION: u32 = 1;
36const VECTOR_FILE_MAGIC_NUM: u32 = 0x3866cd92;
37
38#[cfg_attr(test, derive(PartialEq, Debug))]
39pub struct VectorBlockInner {
40    dimension: usize,
41    vector_payload: Vec<VectorItemType>,
42    info_payload: Vec<u8>,
43    info_offset: Vec<u32>,
44}
45
46impl VectorBlockInner {
47    pub fn count(&self) -> usize {
48        self.info_offset.len()
49    }
50
51    pub fn vec_ref(&self, idx: usize) -> VectorRef<'_> {
52        let start = idx * self.dimension;
53        let end = start + self.dimension;
54        VectorRef::from_slice_unchecked(&self.vector_payload[start..end])
55    }
56
57    pub fn info(&self, idx: usize) -> &[u8] {
58        let start = if idx == 0 {
59            0
60        } else {
61            self.info_offset[idx - 1] as usize
62        };
63        let end = self.info_offset[idx] as usize;
64        &self.info_payload[start..end]
65    }
66
67    pub fn encoded_len(&self) -> usize {
68        size_of::<u32>() // dimension
69            + size_of::<u32>() // vector count
70            + self.info_offset.len() * size_of::<u32>() // info offsets
71            + self.info_payload.len() // info payload
72            + self.vector_payload.len() * size_of::<f32>() // vector payload
73    }
74}
75
76pub struct VectorBlockBuilder {
77    inner: VectorBlockInner,
78    encoded_len: usize,
79}
80
81impl VectorBlockBuilder {
82    pub fn new(dimension: usize) -> Self {
83        Self {
84            inner: VectorBlockInner {
85                dimension,
86                vector_payload: vec![],
87                info_payload: vec![],
88                info_offset: vec![],
89            },
90            encoded_len: size_of::<u32>() // dimension
91             + size_of::<u32>(), // vector count
92        }
93    }
94
95    pub fn vec_ref(&self, idx: usize) -> VectorRef<'_> {
96        self.inner.vec_ref(idx)
97    }
98
99    pub fn info(&self, idx: usize) -> &[u8] {
100        self.inner.info(idx)
101    }
102
103    pub fn add(&mut self, vec: VectorRef<'_>, info: &[u8]) {
104        let slice = vec.as_slice();
105        assert_eq!(self.inner.dimension, slice.len());
106        self.inner.vector_payload.extend_from_slice(slice);
107        self.inner.info_payload.extend_from_slice(info);
108        let offset = self.inner.info_payload.len();
109        self.inner.info_offset.push(
110            offset
111                .try_into()
112                .unwrap_or_else(|_| panic!("offset {} overflow", offset)),
113        );
114        self.encoded_len += size_of::<u32>() + size_of_val(slice) + info.len()
115    }
116
117    pub fn encoded_len(&self) -> usize {
118        debug_assert_eq!(self.encoded_len, self.inner.encoded_len());
119        self.encoded_len
120    }
121
122    pub fn finish(self) -> Option<VectorBlock> {
123        if !self.inner.info_offset.is_empty() {
124            Some(VectorBlock(Arc::new(self.inner)))
125        } else {
126            None
127        }
128    }
129}
130
131#[cfg_attr(test, derive(PartialEq, Debug))]
132#[derive(Clone)]
133pub struct VectorBlock(Arc<VectorBlockInner>);
134
135impl VectorBlock {
136    pub fn count(&self) -> usize {
137        self.0.count()
138    }
139
140    pub fn vec_ref(&self, idx: usize) -> VectorRef<'_> {
141        self.0.vec_ref(idx)
142    }
143
144    pub fn info(&self, idx: usize) -> &[u8] {
145        self.0.info(idx)
146    }
147
148    /// # Format:
149    ///
150    /// ```plain
151    /// | dimension (u32) | vector_count (u32) |
152    /// | info_offset (u32) * vector_count |
153    /// | info_payload (u8) * (last offset in info_offset) |
154    /// | vector_payload (f32) * (dimension * vector_count)  |
155    /// ```
156    fn encode_payload(&self, mut buf: impl BufMut) {
157        buf.put_u32_le(self.0.dimension.try_into().unwrap());
158        let vector_count = self.0.info_offset.len();
159        assert!(vector_count > 0);
160        buf.put_u32_le(
161            vector_count
162                .try_into()
163                .unwrap_or_else(|_| panic!("vector count {} overflow", vector_count)),
164        );
165        for offset in &self.0.info_offset {
166            buf.put_u32_le(*offset);
167        }
168        let last_offset = *self.0.info_offset.last().unwrap();
169        assert_eq!(last_offset as usize, self.0.info_payload.len());
170        buf.put_slice(&self.0.info_payload);
171        assert_eq!(self.0.vector_payload.len(), self.0.dimension * vector_count);
172        encode_vector_payload(&self.0.vector_payload, &mut buf);
173    }
174
175    fn decode_payload(mut buf: impl Buf) -> Self {
176        let dimension: usize = buf.get_u32_le().try_into().unwrap();
177        let vector_count = buf.get_u32_le() as usize;
178        let mut info_offset = Vec::with_capacity(vector_count);
179        for _ in 0..vector_count {
180            let offset = buf.get_u32_le();
181            info_offset.push(offset);
182        }
183        let info_payload_len = *info_offset.last().expect("non-empty") as usize;
184        let mut info_payload = vec![0; info_payload_len];
185        buf.copy_to_slice(&mut info_payload);
186        let vector_item_count = dimension * vector_count;
187        let vector_payload = decode_vector_payload(vector_item_count, buf);
188
189        Self(Arc::new(VectorBlockInner {
190            dimension,
191            vector_payload,
192            info_payload,
193            info_offset,
194        }))
195    }
196
197    fn encode(&self, encoded_block_len: usize) -> Bytes {
198        let encoded_len = encoded_block_len + size_of::<u64>(); // add space for checksum
199        let mut encoded_block = BytesMut::with_capacity(encoded_len);
200        self.encode_payload(&mut encoded_block);
201        let checksum = xxhash64_checksum(encoded_block.as_ref());
202        encoded_block.put_u64_le(checksum);
203        let encoded_block = encoded_block.freeze();
204        debug_assert_eq!(encoded_block.len(), encoded_len);
205        encoded_block
206    }
207
208    pub fn decode(buf: &[u8]) -> HummockResult<Self> {
209        if buf.len() < size_of::<u64>() {
210            return Err(HummockError::decode_error("block too short"));
211        }
212        let back_cursor_end = buf.len();
213        let back_cursor_start = back_cursor_end - size_of::<u64>();
214        let payload = &buf[..back_cursor_start];
215        {
216            let checksum =
217                u64::from_le_bytes(buf[back_cursor_start..back_cursor_end].try_into().unwrap());
218            xxhash64_verify(payload, checksum)?;
219        }
220        Ok(Self::decode_payload(payload))
221    }
222}
223
224impl<'a> IntoIterator for &'a VectorBlock {
225    type Item = (VectorRef<'a>, &'a [u8]);
226
227    type IntoIter = impl Iterator<Item = Self::Item>;
228
229    fn into_iter(self) -> Self::IntoIter {
230        (0..self.0.info_offset.len()).map(|i| (self.vec_ref(i), self.info(i)))
231    }
232}
233
234#[derive(Debug)]
235pub struct VectorBlockMeta {
236    pub offset: usize,
237    pub block_size: usize,
238    pub vector_count: usize,
239    pub start_vector_id: usize,
240}
241
242impl VectorBlockMeta {
243    fn encode(&self, mut buf: impl BufMut) {
244        buf.put_u64_le(self.offset.try_into().unwrap());
245        buf.put_u64_le(self.block_size.try_into().unwrap());
246        buf.put_u64_le(self.vector_count.try_into().unwrap());
247        buf.put_u64_le(self.start_vector_id.try_into().unwrap());
248    }
249
250    fn encoded_len() -> usize {
251        size_of::<u64>() * 4 // offset, block_size, vector_count, start_vector_id
252    }
253
254    fn decode(mut buf: impl Buf) -> Self {
255        let offset = buf.get_u64_le().try_into().unwrap();
256        let block_size = buf.get_u64_le().try_into().unwrap();
257        let vector_count = buf.get_u64_le().try_into().unwrap();
258        let start_vector_id = buf.get_u64_le().try_into().unwrap();
259        Self {
260            offset,
261            block_size,
262            vector_count,
263            start_vector_id,
264        }
265    }
266}
267
268pub struct VectorFileMeta {
269    pub block_metas: Vec<VectorBlockMeta>,
270}
271
272impl VectorFileMeta {
273    fn encode(&self, mut buf: impl BufMut) {
274        buf.put_u32_le(self.block_metas.len().try_into().unwrap());
275        for meta in &self.block_metas {
276            meta.encode(&mut buf);
277        }
278    }
279
280    fn encode_len(&self) -> usize {
281        size_of::<u32>() // block count
282            + self.block_metas.len() * VectorBlockMeta::encoded_len()
283    }
284
285    fn decode(mut buf: impl Buf) -> Self {
286        let block_count = buf.get_u32_le() as usize;
287        let mut block_metas = Vec::with_capacity(block_count);
288        for _ in 0..block_count {
289            block_metas.push(VectorBlockMeta::decode(&mut buf));
290        }
291        Self { block_metas }
292    }
293
294    fn preserved_footer_len() -> usize {
295        size_of::<u64>() // checksum
296            + size_of::<u64>() // meta offset
297            + size_of::<u32>() // version
298            + size_of::<u32>() // magic
299    }
300
301    fn encode_footer(&self, mete_offset: usize) -> Bytes {
302        let encoded_footer_len = self.encode_len() + Self::preserved_footer_len();
303        let mut encoded_footer = BytesMut::with_capacity(encoded_footer_len);
304        self.encode(&mut encoded_footer);
305        let checksum = xxhash64_checksum(encoded_footer.as_ref());
306        encoded_footer.put_u64_le(checksum);
307        encoded_footer.put_u64_le(mete_offset.try_into().unwrap());
308        encoded_footer.put_u32_le(VECTOR_FILE_VERSION);
309        encoded_footer.put_u32_le(VECTOR_FILE_MAGIC_NUM);
310        let encoded_footer = encoded_footer.freeze();
311        debug_assert_eq!(encoded_footer.len(), encoded_footer_len);
312        encoded_footer
313    }
314
315    pub fn decode_footer(buf: &[u8]) -> HummockResult<Self> {
316        if buf.len() < Self::preserved_footer_len() {
317            return Err(HummockError::decode_error("footer too short"));
318        }
319        // get magic number
320        let mut back_cursor_end = buf.len();
321        let mut back_cursor_start = back_cursor_end - size_of::<u32>();
322        {
323            let magic =
324                u32::from_le_bytes(buf[back_cursor_start..back_cursor_end].try_into().unwrap());
325            if magic != VECTOR_FILE_MAGIC_NUM {
326                return Err(HummockError::magic_mismatch(VECTOR_FILE_MAGIC_NUM, magic));
327            }
328        }
329        // get file version
330        back_cursor_end = back_cursor_start;
331        back_cursor_start = back_cursor_end - size_of::<u32>();
332        {
333            let file_version =
334                u32::from_le_bytes(buf[back_cursor_start..back_cursor_end].try_into().unwrap());
335            if file_version != VECTOR_FILE_VERSION {
336                return Err(HummockError::invalid_format_version(file_version));
337            }
338        }
339
340        // skip meta offset
341        back_cursor_end = back_cursor_start;
342        back_cursor_start = back_cursor_end - size_of::<u64>();
343
344        // get and check checksum
345        back_cursor_end = back_cursor_start;
346        back_cursor_start = back_cursor_end - size_of::<u64>();
347        let payload = &buf[..back_cursor_start];
348        {
349            let checksum =
350                u64::from_le_bytes(buf[back_cursor_start..back_cursor_end].try_into().unwrap());
351            xxhash64_verify(payload, checksum)?;
352        }
353
354        Ok(Self::decode(payload))
355    }
356}
357
358pub type NewStreamingUploaderFn = Box<
359    dyn Fn() -> BoxFuture<'static, HummockResult<(HummockVectorFileId, ObjectStreamingUploader)>>
360        + Send
361        + Sync,
362>;
363
364pub struct VectorFileBuilder {
365    dimension: usize,
366    new_uploader: NewStreamingUploaderFn,
367    max_block_size: usize,
368
369    // builder state
370    block_metas: Vec<VectorBlockMeta>,
371    blocks: Vec<VectorBlock>,
372    building_block: Option<(VectorBlockBuilder, usize)>,
373    next_vector_id: usize,
374    next_block_offset: usize,
375    uploader: Option<(HummockVectorFileId, ObjectStreamingUploader)>,
376}
377
378impl VectorFileBuilder {
379    pub fn new(
380        dimension: usize,
381        new_uploader: NewStreamingUploaderFn,
382        next_vector_id: usize,
383        max_block_size: usize,
384    ) -> Self {
385        Self {
386            dimension,
387            new_uploader,
388            max_block_size,
389            block_metas: Vec::new(),
390            blocks: Vec::new(),
391            building_block: None,
392            uploader: None,
393            next_vector_id,
394            next_block_offset: 0,
395        }
396    }
397
398    pub fn get_vector(&self, idx: usize) -> EnumVectorAccessor<'_> {
399        if let Some((builder, start_vector_id)) = &self.building_block
400            && idx >= *start_vector_id
401        {
402            EnumVectorAccessor::Builder(builder, idx - start_vector_id)
403        } else {
404            let (block_idx, offset) =
405                search_vector(&self.block_metas, idx, |meta| meta.start_vector_id);
406            EnumVectorAccessor::BlockRef(&self.blocks[block_idx], offset)
407        }
408    }
409
410    pub fn add(&mut self, vec: VectorRef<'_>, info: &[u8]) {
411        let (builder, _) = self
412            .building_block
413            .get_or_insert_with(|| (VectorBlockBuilder::new(self.dimension), self.next_vector_id));
414        builder.add(vec, info);
415        self.next_vector_id += 1;
416    }
417
418    pub async fn finish(
419        &mut self,
420    ) -> HummockResult<Option<(VectorFileInfo, Vec<VectorBlock>, VectorFileMeta)>> {
421        self.flush_inner().await?;
422        assert!(self.building_block.is_none(), "unfinished block builder");
423        let ret = if let Some((object_id, mut uploader)) = self.uploader.take() {
424            assert!(!self.block_metas.is_empty());
425            let start_vector_id = self.block_metas.get(0).expect("non-empty").start_vector_id;
426            let meta = VectorFileMeta {
427                block_metas: take(&mut self.block_metas),
428            };
429            let meta_offset = self.next_block_offset;
430            let encoded_footer = meta.encode_footer(meta_offset);
431            let file_size = meta_offset + encoded_footer.len();
432            uploader.write_bytes(encoded_footer).await?;
433            uploader.finish().await?;
434            let file_info = VectorFileInfo {
435                object_id,
436                file_size: file_size.try_into().unwrap(),
437                start_vector_id,
438                vector_count: self.blocks.iter().map(|b| b.count()).sum::<usize>(),
439                meta_offset,
440            };
441            self.next_block_offset = 0;
442            Ok(Some((file_info, take(&mut self.blocks), meta)))
443        } else {
444            Ok(None)
445        };
446        assert!(self.is_empty(), "builder not empty after finish");
447        assert_eq!(
448            self.next_block_offset, 0,
449            "next_block_offset should be zero after finish"
450        );
451        ret
452    }
453
454    pub fn is_empty(&self) -> bool {
455        self.building_block.is_none() && self.blocks.is_empty() && self.block_metas.is_empty()
456    }
457
458    pub fn next_vector_id(&self) -> usize {
459        self.next_vector_id
460    }
461
462    pub async fn try_flush(&mut self) -> HummockResult<()> {
463        if let Some((builder, _)) = &self.building_block
464            && builder.encoded_len() >= self.max_block_size
465        {
466            self.flush_inner().await?;
467        }
468        Ok(())
469    }
470
471    async fn flush_inner(&mut self) -> HummockResult<()> {
472        if let Some((builder, start_vector_id)) = self.building_block.take() {
473            let encoded_block_len = builder.encoded_len();
474            if let Some(block) = builder.finish() {
475                let (_, uploader) = match &mut self.uploader {
476                    Some(uploader) => uploader,
477                    None => self.uploader.insert((self.new_uploader)().await?),
478                };
479                let encoded_block = block.encode(encoded_block_len);
480                let block_meta = VectorBlockMeta {
481                    offset: self.next_block_offset,
482                    block_size: encoded_block.len(),
483                    vector_count: block.count(),
484                    start_vector_id,
485                };
486                self.next_block_offset += encoded_block.len();
487                uploader.write_bytes(encoded_block).await?;
488                self.block_metas.push(block_meta);
489                self.blocks.push(block);
490            }
491        }
492        Ok(())
493    }
494}
495
496pub(super) struct BuildingVectors {
497    flushed_next_vector_id: usize,
498    pub(super) file_builder: VectorFileBuilder,
499}
500
501#[derive(Default)]
502pub(crate) struct FileVectorStoreCtx {
503    pub stats: VectorStoreCacheStats,
504}
505
506pub(crate) struct FileVectorStore {
507    sstable_store: SstableStoreRef,
508
509    flushed_vector_files: Vec<VectorFileInfo>,
510    pub(super) building_vectors: Option<BuildingVectors>,
511}
512
513impl FileVectorStore {
514    pub(crate) fn new_for_reader(index: &HnswFlatIndex, sstable_store: SstableStoreRef) -> Self {
515        Self {
516            sstable_store,
517            flushed_vector_files: index.vector_store_info.vector_files.clone(),
518            building_vectors: None,
519        }
520    }
521
522    pub(crate) fn new_for_writer(
523        index: &HnswFlatIndex,
524        dimension: usize,
525        sstable_store: SstableStoreRef,
526        object_id_manager: VectorObjectIdManagerRef,
527        storage_opts: &StorageOpts,
528    ) -> Self {
529        let next_vector_id = index.vector_store_info.next_vector_id;
530        Self {
531            sstable_store: sstable_store.clone(),
532            flushed_vector_files: index.vector_store_info.vector_files.clone(),
533            building_vectors: Some(BuildingVectors {
534                flushed_next_vector_id: next_vector_id,
535                file_builder: new_vector_file_builder(
536                    dimension,
537                    next_vector_id,
538                    sstable_store,
539                    object_id_manager,
540                    storage_opts,
541                ),
542            }),
543        }
544    }
545
546    pub(super) async fn flush(&mut self) -> HummockResult<Option<VectorFileInfo>> {
547        let building_vectors = self.building_vectors.as_mut().expect("for write");
548        if let Some((vector_file, blocks, meta)) = building_vectors.file_builder.finish().await? {
549            self.sstable_store
550                .insert_vector_cache(vector_file.object_id, meta, blocks);
551            self.flushed_vector_files.push(vector_file.clone());
552            building_vectors.flushed_next_vector_id =
553                building_vectors.file_builder.next_vector_id();
554            Ok(Some(vector_file))
555        } else {
556            Ok(None)
557        }
558    }
559
560    pub fn flushed_vector_files(&self) -> &[VectorFileInfo] {
561        &self.flushed_vector_files
562    }
563}
564
565impl VectorStore for FileVectorStore {
566    type Accessor<'a>
567        = EnumVectorAccessor<'a>
568    where
569        Self: 'a;
570    type Ctx = FileVectorStoreCtx;
571    type Err = HummockError;
572
573    async fn get_vector(
574        &self,
575        idx: usize,
576        ctx: &mut Self::Ctx,
577    ) -> HummockResult<Self::Accessor<'_>> {
578        if let Some(building_vectors) = self.building_vectors.as_ref()
579            && idx >= building_vectors.flushed_next_vector_id
580        {
581            Ok(building_vectors.file_builder.get_vector(idx))
582        } else {
583            Ok(EnumVectorAccessor::BlockHolder(
584                get_vector_block(
585                    &self.sstable_store,
586                    &self.flushed_vector_files,
587                    idx,
588                    &mut ctx.stats,
589                )
590                .await?,
591            ))
592        }
593    }
594}
595
596#[cfg(test)]
597mod tests {
598    use itertools::Itertools;
599    use risingwave_common::util::iter_util::ZipEqDebug;
600
601    use crate::hummock::vector::file::{VectorBlock, VectorBlockBuilder};
602    use crate::vector::test_utils::{gen_info, gen_vector};
603
604    const VECTOR_DIM: usize = 128;
605
606    #[test]
607    fn test_basic() {
608        let input = (0..200)
609            .map(|i| (gen_vector(VECTOR_DIM), gen_info(i)))
610            .collect_vec();
611        let mut builder = VectorBlockBuilder::new(VECTOR_DIM);
612        for (vec, info) in &input {
613            builder.add(vec.to_ref(), info);
614        }
615        let expected_encoded_len = builder.encoded_len();
616        let block = builder.finish().unwrap();
617        let mut encoded_block = Vec::new();
618        block.encode_payload(&mut encoded_block);
619        assert_eq!(expected_encoded_len, encoded_block.len());
620        let decoded_block = VectorBlock::decode_payload(encoded_block.as_slice());
621        assert_eq!(block, decoded_block);
622        for ((expected_vec, expected_info), (actual_vec, actual_info)) in
623            input.iter().zip_eq_debug(&block)
624        {
625            assert_eq!(expected_vec.to_ref(), actual_vec);
626            assert_eq!(expected_info.iter().as_slice(), actual_info);
627        }
628    }
629
630    #[test]
631    fn test_empty_builder() {
632        let builder = VectorBlockBuilder::new(VECTOR_DIM);
633        assert!(builder.finish().is_none());
634    }
635}