risingwave_storage/hummock/vector/writer/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::mem::take;
16use std::sync::Arc;
17
18use bytes::Bytes;
19use futures::FutureExt;
20use risingwave_hummock_sdk::vector_index::{
21    FlatIndexAdd, VectorFileInfo, VectorIndex, VectorIndexAdd, VectorIndexImpl,
22    VectorStoreInfoDelta,
23};
24use risingwave_hummock_sdk::{HummockObjectId, HummockRawObjectId};
25
26use crate::hummock::vector::file::VectorFileBuilder;
27use crate::hummock::{HummockResult, ObjectIdManager, ObjectIdManagerRef, SstableStoreRef};
28use crate::opts::StorageOpts;
29use crate::vector::Vector;
30
31#[async_trait::async_trait]
32pub trait VectorObjectIdManager: Send + Sync {
33    async fn get_new_vector_object_id(&self) -> HummockResult<HummockRawObjectId>;
34}
35
36pub type VectorObjectIdManagerRef = Arc<dyn VectorObjectIdManager>;
37
38#[async_trait::async_trait]
39impl VectorObjectIdManager for ObjectIdManager {
40    async fn get_new_vector_object_id(&self) -> HummockResult<HummockRawObjectId> {
41        self.get_new_object_id().await
42    }
43}
44
45pub(crate) fn new_vector_file_builder(
46    dimension: usize,
47    next_vector_id: usize,
48    sstable_store: SstableStoreRef,
49    object_id_manager: VectorObjectIdManagerRef,
50    storage_opts: &StorageOpts,
51) -> VectorFileBuilder {
52    VectorFileBuilder::new(
53        dimension,
54        Box::new(move || {
55            let object_id_manager = object_id_manager.clone();
56            let sstable_store = sstable_store.clone();
57            async move {
58                let object_id = object_id_manager.get_new_vector_object_id().await?.into();
59                let path =
60                    sstable_store.get_object_data_path(HummockObjectId::VectorFile(object_id));
61                let uploader = sstable_store.create_streaming_uploader(&path).await?;
62                Ok((object_id, uploader))
63            }
64            .boxed()
65        }),
66        next_vector_id,
67        storage_opts.vector_file_block_size_kb * 1024,
68    )
69}
70
71pub(crate) struct VectorWriterImpl {
72    flushed_vector_files: Vec<VectorFileInfo>,
73    sstable_store: SstableStoreRef,
74    vector_file_builder: VectorFileBuilder,
75}
76
77impl VectorWriterImpl {
78    pub(crate) fn new(
79        index: &VectorIndex,
80        sstable_store: SstableStoreRef,
81        object_id_manager: ObjectIdManagerRef,
82        storage_opts: &StorageOpts,
83    ) -> Self {
84        let VectorIndexImpl::Flat(flat_index) = &index.inner;
85        Self {
86            flushed_vector_files: vec![],
87            sstable_store: sstable_store.clone(),
88            vector_file_builder: new_vector_file_builder(
89                index.dimension,
90                flat_index.vector_store_info.next_vector_id,
91                sstable_store,
92                object_id_manager,
93                storage_opts,
94            ),
95        }
96    }
97
98    pub(crate) fn insert(&mut self, vec: Vector, info: Bytes) -> HummockResult<()> {
99        self.vector_file_builder.add(vec.to_ref(), info.as_ref());
100        Ok(())
101    }
102
103    pub(crate) fn seal_current_epoch(&mut self) -> Option<VectorIndexAdd> {
104        assert!(self.vector_file_builder.is_empty());
105        if self.flushed_vector_files.is_empty() {
106            return None;
107        }
108        Some(VectorIndexAdd::Flat(FlatIndexAdd {
109            vector_store_info_delta: VectorStoreInfoDelta {
110                next_vector_id: self.vector_file_builder.next_vector_id(),
111                added_vector_files: take(&mut self.flushed_vector_files),
112            },
113        }))
114    }
115
116    pub(crate) async fn flush(&mut self) -> HummockResult<usize> {
117        if let Some((file_info, blocks, meta)) = self.vector_file_builder.finish().await? {
118            self.sstable_store
119                .insert_vector_cache(file_info.object_id, meta, blocks);
120            let size = file_info.file_size as _;
121            self.flushed_vector_files.push(file_info);
122            Ok(size)
123        } else {
124            Ok(0)
125        }
126    }
127
128    pub(crate) async fn try_flush(&mut self) -> HummockResult<()> {
129        self.vector_file_builder.try_flush().await
130    }
131}