risingwave_storage/hummock/vector/
file.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::mem::take;
16use std::slice;
17use std::sync::Arc;
18
19use bytes::{Buf, BufMut, Bytes, BytesMut};
20use futures::future::BoxFuture;
21use risingwave_hummock_sdk::HummockVectorFileId;
22use risingwave_hummock_sdk::vector_index::VectorFileInfo;
23use risingwave_object_store::object::ObjectStreamingUploader;
24
25use crate::hummock::{HummockError, HummockResult, xxhash64_checksum, xxhash64_verify};
26use crate::vector::VectorRef;
27
28const VECTOR_FILE_VERSION: u32 = 1;
29const VECTOR_FILE_MAGIC_NUM: u32 = 0x3866cd92;
30
31#[cfg_attr(test, derive(PartialEq, Debug))]
32pub struct VectorBlockInner {
33    dimension: usize,
34    vector_payload: Vec<f32>,
35    info_payload: Vec<u8>,
36    info_offset: Vec<u32>,
37}
38
39impl VectorBlockInner {
40    pub fn count(&self) -> usize {
41        self.info_offset.len()
42    }
43
44    pub fn vec_ref(&self, idx: usize) -> VectorRef<'_> {
45        let start = idx * self.dimension;
46        let end = start + self.dimension;
47        VectorRef::from_slice(&self.vector_payload[start..end])
48    }
49
50    pub fn info(&self, idx: usize) -> &[u8] {
51        let start = if idx == 0 {
52            0
53        } else {
54            self.info_offset[idx - 1] as usize
55        };
56        let end = self.info_offset[idx] as usize;
57        &self.info_payload[start..end]
58    }
59
60    pub fn encoded_len(&self) -> usize {
61        size_of::<u32>() // dimension
62            + size_of::<u32>() // vector count
63            + self.info_offset.len() * size_of::<u32>() // info offsets
64            + self.info_payload.len() // info payload
65            + self.vector_payload.len() * size_of::<f32>() // vector payload
66    }
67}
68
69pub struct VectorBlockBuilder {
70    inner: VectorBlockInner,
71    encoded_len: usize,
72}
73
74impl VectorBlockBuilder {
75    pub fn new(dimension: usize) -> Self {
76        Self {
77            inner: VectorBlockInner {
78                dimension,
79                vector_payload: vec![],
80                info_payload: vec![],
81                info_offset: vec![],
82            },
83            encoded_len: size_of::<u32>() // dimension
84             + size_of::<u32>(), // vector count
85        }
86    }
87
88    pub fn add(&mut self, vec: VectorRef<'_>, info: &[u8]) {
89        let slice = vec.as_slice();
90        assert_eq!(self.inner.dimension, slice.len());
91        self.inner.vector_payload.extend_from_slice(slice);
92        self.inner.info_payload.extend_from_slice(info);
93        let offset = self.inner.info_payload.len();
94        self.inner.info_offset.push(
95            offset
96                .try_into()
97                .unwrap_or_else(|_| panic!("offset {} overflow", offset)),
98        );
99        self.encoded_len += size_of::<u32>() + size_of_val(slice) + info.len()
100    }
101
102    pub fn encoded_len(&self) -> usize {
103        debug_assert_eq!(self.encoded_len, self.inner.encoded_len());
104        self.encoded_len
105    }
106
107    pub fn finish(self) -> Option<VectorBlock> {
108        if !self.inner.info_offset.is_empty() {
109            Some(VectorBlock(Arc::new(self.inner)))
110        } else {
111            None
112        }
113    }
114}
115
116#[cfg_attr(test, derive(PartialEq, Debug))]
117#[derive(Clone)]
118pub struct VectorBlock(Arc<VectorBlockInner>);
119
120impl VectorBlock {
121    pub fn count(&self) -> usize {
122        self.0.count()
123    }
124
125    pub fn vec_ref(&self, idx: usize) -> VectorRef<'_> {
126        self.0.vec_ref(idx)
127    }
128
129    pub fn info(&self, idx: usize) -> &[u8] {
130        self.0.info(idx)
131    }
132
133    /// # Format:
134    ///
135    /// ```plain
136    /// | dimension (u32) | vector_count (u32) |
137    /// | info_offset (u32) * vector_count |
138    /// | info_payload (u8) * (last offset in info_offset) |
139    /// | vector_payload (f32) * (dimension * vector_count)  |
140    /// ```
141    fn encode_payload(&self, mut buf: impl BufMut) {
142        buf.put_u32_le(self.0.dimension.try_into().unwrap());
143        let vector_count = self.0.info_offset.len();
144        assert!(vector_count > 0);
145        buf.put_u32_le(
146            vector_count
147                .try_into()
148                .unwrap_or_else(|_| panic!("vector count {} overflow", vector_count)),
149        );
150        for offset in &self.0.info_offset {
151            buf.put_u32_le(*offset);
152        }
153        let last_offset = *self.0.info_offset.last().unwrap();
154        assert_eq!(last_offset as usize, self.0.info_payload.len());
155        buf.put_slice(&self.0.info_payload);
156        assert_eq!(self.0.vector_payload.len(), self.0.dimension * vector_count);
157        let vector_payload_ptr = self.0.vector_payload.as_slice().as_ptr() as *const u8;
158        // safety: correctly set the size of vector_payload
159        let vector_payload_slice = unsafe {
160            slice::from_raw_parts(
161                vector_payload_ptr,
162                self.0.vector_payload.len() * size_of::<f32>(),
163            )
164        };
165        buf.put_slice(vector_payload_slice);
166    }
167
168    fn decode_payload(mut buf: impl Buf) -> Self {
169        let dimension: usize = buf.get_u32_le().try_into().unwrap();
170        let vector_count = buf.get_u32_le() as usize;
171        let mut info_offset = Vec::with_capacity(vector_count);
172        for _ in 0..vector_count {
173            let offset = buf.get_u32_le();
174            info_offset.push(offset);
175        }
176        let info_payload_len = *info_offset.last().expect("non-empty") as usize;
177        let mut info_payload = vec![0; info_payload_len];
178        buf.copy_to_slice(&mut info_payload);
179        let vector_item_count = dimension * vector_count;
180        let mut vector_payload = Vec::with_capacity(vector_item_count);
181
182        let vector_payload_ptr = vector_payload.spare_capacity_mut().as_mut_ptr() as *mut u8;
183        // safety: no data append to vector_payload, and correctly set the size of vector_payload
184        let vector_payload_slice = unsafe {
185            slice::from_raw_parts_mut(vector_payload_ptr, vector_item_count * size_of::<f32>())
186        };
187        buf.copy_to_slice(vector_payload_slice);
188        // safety: have written correct amount of data
189        unsafe {
190            vector_payload.set_len(vector_item_count);
191        }
192        Self(Arc::new(VectorBlockInner {
193            dimension,
194            vector_payload,
195            info_payload,
196            info_offset,
197        }))
198    }
199
200    fn encode(&self, encoded_block_len: usize) -> Bytes {
201        let encoded_len = encoded_block_len + size_of::<u64>(); // add space for checksum
202        let mut encoded_block = BytesMut::with_capacity(encoded_len);
203        self.encode_payload(&mut encoded_block);
204        let checksum = xxhash64_checksum(encoded_block.as_ref());
205        encoded_block.put_u64_le(checksum);
206        let encoded_block = encoded_block.freeze();
207        debug_assert_eq!(encoded_block.len(), encoded_len);
208        encoded_block
209    }
210
211    pub fn decode(buf: &[u8]) -> HummockResult<Self> {
212        if buf.len() < size_of::<u64>() {
213            return Err(HummockError::decode_error("block too short"));
214        }
215        let back_cursor_end = buf.len();
216        let back_cursor_start = back_cursor_end - size_of::<u64>();
217        let payload = &buf[..back_cursor_start];
218        {
219            let checksum =
220                u64::from_le_bytes(buf[back_cursor_start..back_cursor_end].try_into().unwrap());
221            xxhash64_verify(payload, checksum)?;
222        }
223        Ok(Self::decode_payload(payload))
224    }
225}
226
227impl<'a> IntoIterator for &'a VectorBlock {
228    type Item = (VectorRef<'a>, &'a [u8]);
229
230    type IntoIter = impl Iterator<Item = Self::Item>;
231
232    fn into_iter(self) -> Self::IntoIter {
233        (0..self.0.info_offset.len()).map(|i| (self.vec_ref(i), self.info(i)))
234    }
235}
236
237pub struct VectorBlockMeta {
238    pub offset: usize,
239    pub block_size: usize,
240    pub vector_count: usize,
241    pub start_vector_id: usize,
242}
243
244impl VectorBlockMeta {
245    fn encode(&self, mut buf: impl BufMut) {
246        buf.put_u64_le(self.offset.try_into().unwrap());
247        buf.put_u64_le(self.block_size.try_into().unwrap());
248        buf.put_u64_le(self.vector_count.try_into().unwrap());
249        buf.put_u64_le(self.start_vector_id.try_into().unwrap());
250    }
251
252    fn encoded_len() -> usize {
253        size_of::<u64>() * 4 // offset, block_size, vector_count, start_vector_id
254    }
255
256    fn decode(mut buf: impl Buf) -> Self {
257        let offset = buf.get_u64_le().try_into().unwrap();
258        let block_size = buf.get_u64_le().try_into().unwrap();
259        let vector_count = buf.get_u64_le().try_into().unwrap();
260        let start_vector_id = buf.get_u64_le().try_into().unwrap();
261        Self {
262            offset,
263            block_size,
264            vector_count,
265            start_vector_id,
266        }
267    }
268}
269
270pub struct VectorFileMeta {
271    pub block_metas: Vec<VectorBlockMeta>,
272}
273
274impl VectorFileMeta {
275    fn encode(&self, mut buf: impl BufMut) {
276        buf.put_u32_le(self.block_metas.len().try_into().unwrap());
277        for meta in &self.block_metas {
278            meta.encode(&mut buf);
279        }
280    }
281
282    fn encode_len(&self) -> usize {
283        size_of::<u32>() // block count
284            + self.block_metas.len() * VectorBlockMeta::encoded_len()
285    }
286
287    fn decode(mut buf: impl Buf) -> Self {
288        let block_count = buf.get_u32_le() as usize;
289        let mut block_metas = Vec::with_capacity(block_count);
290        for _ in 0..block_count {
291            block_metas.push(VectorBlockMeta::decode(&mut buf));
292        }
293        Self { block_metas }
294    }
295
296    fn preserved_footer_len() -> usize {
297        size_of::<u64>() // checksum
298            + size_of::<u64>() // meta offset
299            + size_of::<u32>() // version
300            + size_of::<u32>() // magic
301    }
302
303    fn encode_footer(&self, mete_offset: usize) -> Bytes {
304        let encoded_footer_len = self.encode_len() + Self::preserved_footer_len();
305        let mut encoded_footer = BytesMut::with_capacity(encoded_footer_len);
306        self.encode(&mut encoded_footer);
307        let checksum = xxhash64_checksum(encoded_footer.as_ref());
308        encoded_footer.put_u64_le(checksum);
309        encoded_footer.put_u64_le(mete_offset.try_into().unwrap());
310        encoded_footer.put_u32_le(VECTOR_FILE_VERSION);
311        encoded_footer.put_u32_le(VECTOR_FILE_MAGIC_NUM);
312        let encoded_footer = encoded_footer.freeze();
313        debug_assert_eq!(encoded_footer.len(), encoded_footer_len);
314        encoded_footer
315    }
316
317    pub fn decode_footer(buf: &[u8]) -> HummockResult<Self> {
318        if buf.len() < Self::preserved_footer_len() {
319            return Err(HummockError::decode_error("footer too short"));
320        }
321        // get magic number
322        let mut back_cursor_end = buf.len();
323        let mut back_cursor_start = back_cursor_end - size_of::<u32>();
324        {
325            let magic =
326                u32::from_le_bytes(buf[back_cursor_start..back_cursor_end].try_into().unwrap());
327            if magic != VECTOR_FILE_MAGIC_NUM {
328                return Err(HummockError::magic_mismatch(VECTOR_FILE_MAGIC_NUM, magic));
329            }
330        }
331        // get file version
332        back_cursor_end = back_cursor_start;
333        back_cursor_start = back_cursor_end - size_of::<u32>();
334        {
335            let file_version =
336                u32::from_le_bytes(buf[back_cursor_start..back_cursor_end].try_into().unwrap());
337            if file_version != VECTOR_FILE_VERSION {
338                return Err(HummockError::invalid_format_version(file_version));
339            }
340        }
341
342        // skip meta offset
343        back_cursor_end = back_cursor_start;
344        back_cursor_start = back_cursor_end - size_of::<u64>();
345
346        // get and check checksum
347        back_cursor_end = back_cursor_start;
348        back_cursor_start = back_cursor_end - size_of::<u64>();
349        let payload = &buf[..back_cursor_start];
350        {
351            let checksum =
352                u64::from_le_bytes(buf[back_cursor_start..back_cursor_end].try_into().unwrap());
353            xxhash64_verify(payload, checksum)?;
354        }
355
356        Ok(Self::decode(payload))
357    }
358}
359
360pub type NewStreamingUploaderFn = Box<
361    dyn Fn() -> BoxFuture<'static, HummockResult<(HummockVectorFileId, ObjectStreamingUploader)>>
362        + Send
363        + Sync,
364>;
365
366pub struct VectorFileBuilder {
367    dimension: usize,
368    new_uploader: NewStreamingUploaderFn,
369    max_block_size: usize,
370
371    // builder state
372    block_metas: Vec<VectorBlockMeta>,
373    blocks: Vec<VectorBlock>,
374    building_block: Option<(VectorBlockBuilder, usize)>,
375    next_vector_id: usize,
376    next_block_offset: usize,
377    uploader: Option<(HummockVectorFileId, ObjectStreamingUploader)>,
378}
379
380impl VectorFileBuilder {
381    pub fn new(
382        dimension: usize,
383        new_uploader: NewStreamingUploaderFn,
384        next_vector_id: usize,
385        max_block_size: usize,
386    ) -> Self {
387        Self {
388            dimension,
389            new_uploader,
390            max_block_size,
391            block_metas: Vec::new(),
392            blocks: Vec::new(),
393            building_block: None,
394            uploader: None,
395            next_vector_id,
396            next_block_offset: 0,
397        }
398    }
399
400    pub fn add(&mut self, vec: VectorRef<'_>, info: &[u8]) {
401        let (builder, _) = self
402            .building_block
403            .get_or_insert_with(|| (VectorBlockBuilder::new(self.dimension), self.next_vector_id));
404        builder.add(vec, info);
405        self.next_vector_id += 1;
406    }
407
408    pub async fn finish(
409        &mut self,
410    ) -> HummockResult<Option<(VectorFileInfo, Vec<VectorBlock>, VectorFileMeta)>> {
411        self.flush_inner().await?;
412        assert!(self.building_block.is_none(), "unfinished block builder");
413        let ret = if let Some((object_id, mut uploader)) = self.uploader.take() {
414            assert!(!self.block_metas.is_empty());
415            let start_vector_id = self.block_metas.get(0).expect("non-empty").start_vector_id;
416            let meta = VectorFileMeta {
417                block_metas: take(&mut self.block_metas),
418            };
419            let meta_offset = self.next_block_offset;
420            let encoded_footer = meta.encode_footer(meta_offset);
421            let file_size = meta_offset + encoded_footer.len();
422            uploader.write_bytes(encoded_footer).await?;
423            uploader.finish().await?;
424            let file_info = VectorFileInfo {
425                object_id,
426                file_size: file_size.try_into().unwrap(),
427                start_vector_id,
428                vector_count: self.blocks.iter().map(|b| b.count()).sum::<usize>(),
429                meta_offset,
430            };
431            self.next_block_offset = 0;
432            Ok(Some((file_info, take(&mut self.blocks), meta)))
433        } else {
434            Ok(None)
435        };
436        assert!(self.is_empty(), "builder not empty after finish");
437        assert_eq!(
438            self.next_block_offset, 0,
439            "next_block_offset should be zero after finish"
440        );
441        ret
442    }
443
444    pub fn is_empty(&self) -> bool {
445        self.building_block.is_none() && self.blocks.is_empty() && self.block_metas.is_empty()
446    }
447
448    pub fn next_vector_id(&self) -> usize {
449        self.next_vector_id
450    }
451
452    pub async fn try_flush(&mut self) -> HummockResult<()> {
453        if let Some((builder, _)) = &self.building_block
454            && builder.encoded_len() >= self.max_block_size
455        {
456            self.flush_inner().await?;
457        }
458        Ok(())
459    }
460
461    async fn flush_inner(&mut self) -> HummockResult<()> {
462        if let Some((builder, start_vector_id)) = self.building_block.take() {
463            let encoded_block_len = builder.encoded_len();
464            if let Some(block) = builder.finish() {
465                let (_, uploader) = match &mut self.uploader {
466                    Some(uploader) => uploader,
467                    None => self.uploader.insert((self.new_uploader)().await?),
468                };
469                let encoded_block = block.encode(encoded_block_len);
470                let block_meta = VectorBlockMeta {
471                    offset: self.next_block_offset,
472                    block_size: encoded_block.len(),
473                    vector_count: block.count(),
474                    start_vector_id,
475                };
476                self.next_block_offset += encoded_block.len();
477                uploader.write_bytes(encoded_block).await?;
478                self.block_metas.push(block_meta);
479                self.blocks.push(block);
480            }
481        }
482        Ok(())
483    }
484}
485
486#[cfg(test)]
487mod tests {
488    use itertools::Itertools;
489    use risingwave_common::util::iter_util::ZipEqDebug;
490
491    use crate::hummock::vector::file::{VectorBlock, VectorBlockBuilder};
492    use crate::vector::test_utils::{gen_info, gen_vector};
493
494    const VECTOR_DIM: usize = 128;
495
496    #[test]
497    fn test_basic() {
498        let input = (0..200)
499            .map(|i| (gen_vector(VECTOR_DIM), gen_info(i)))
500            .collect_vec();
501        let mut builder = VectorBlockBuilder::new(VECTOR_DIM);
502        for (vec, info) in &input {
503            builder.add(vec.to_ref(), info);
504        }
505        let expected_encoded_len = builder.encoded_len();
506        let block = builder.finish().unwrap();
507        let mut encoded_block = Vec::new();
508        block.encode_payload(&mut encoded_block);
509        assert_eq!(expected_encoded_len, encoded_block.len());
510        let decoded_block = VectorBlock::decode_payload(encoded_block.as_slice());
511        assert_eq!(block, decoded_block);
512        for ((expected_vec, expected_info), (actual_vec, actual_info)) in
513            input.iter().zip_eq_debug(&block)
514        {
515            assert_eq!(expected_vec.to_ref(), actual_vec);
516            assert_eq!(expected_info.iter().as_slice(), actual_info);
517        }
518    }
519
520    #[test]
521    fn test_empty_builder() {
522        let builder = VectorBlockBuilder::new(VECTOR_DIM);
523        assert!(builder.finish().is_none());
524    }
525}