risingwave_storage/hummock/vector/writer/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod hnsw;
16use std::mem::take;
17use std::sync::Arc;
18
19use bytes::Bytes;
20use futures::FutureExt;
21use hnsw::HnswFlatIndexWriter;
22use risingwave_common::vector::distance::DistanceMeasurement;
23use risingwave_hummock_sdk::vector_index::{
24    FlatIndex, FlatIndexAdd, VectorFileInfo, VectorIndex, VectorIndexAdd, VectorIndexImpl,
25    VectorStoreInfoDelta,
26};
27use risingwave_hummock_sdk::{HummockObjectId, HummockRawObjectId};
28
29use crate::hummock::vector::file::VectorFileBuilder;
30use crate::hummock::{HummockResult, ObjectIdManager, SstableStoreRef};
31use crate::opts::StorageOpts;
32use crate::vector::Vector;
33
34#[async_trait::async_trait]
35pub trait VectorObjectIdManager: Send + Sync {
36    async fn get_new_vector_object_id(&self) -> HummockResult<HummockRawObjectId>;
37}
38
39pub type VectorObjectIdManagerRef = Arc<dyn VectorObjectIdManager>;
40
41#[async_trait::async_trait]
42impl VectorObjectIdManager for ObjectIdManager {
43    async fn get_new_vector_object_id(&self) -> HummockResult<HummockRawObjectId> {
44        self.get_new_object_id().await
45    }
46}
47
48pub(crate) fn new_vector_file_builder(
49    dimension: usize,
50    next_vector_id: usize,
51    sstable_store: SstableStoreRef,
52    object_id_manager: VectorObjectIdManagerRef,
53    storage_opts: &StorageOpts,
54) -> VectorFileBuilder {
55    VectorFileBuilder::new(
56        dimension,
57        Box::new(move || {
58            let object_id_manager = object_id_manager.clone();
59            let sstable_store = sstable_store.clone();
60            async move {
61                let object_id = object_id_manager.get_new_vector_object_id().await?.into();
62                let path =
63                    sstable_store.get_object_data_path(HummockObjectId::VectorFile(object_id));
64                let uploader = sstable_store.create_streaming_uploader(&path).await?;
65                Ok((object_id, uploader))
66            }
67            .boxed()
68        }),
69        next_vector_id,
70        storage_opts.vector_file_block_size_kb * 1024,
71    )
72}
73
74pub(crate) enum VectorWriterImpl {
75    Flat(FlatIndexWriter),
76    HnswFlat(HnswFlatIndexWriter),
77}
78
79impl VectorWriterImpl {
80    pub(crate) async fn new(
81        index: &VectorIndex,
82        sstable_store: SstableStoreRef,
83        object_id_manager: VectorObjectIdManagerRef,
84        storage_opts: &StorageOpts,
85    ) -> HummockResult<Self> {
86        Ok(match &index.inner {
87            VectorIndexImpl::Flat(flat) => VectorWriterImpl::Flat(FlatIndexWriter::new(
88                flat,
89                index.dimension,
90                sstable_store,
91                object_id_manager,
92                storage_opts,
93            )),
94            VectorIndexImpl::HnswFlat(hnsw_flat) => VectorWriterImpl::HnswFlat(
95                HnswFlatIndexWriter::new(
96                    hnsw_flat,
97                    index.dimension,
98                    DistanceMeasurement::from(index.distance_type),
99                    sstable_store,
100                    object_id_manager,
101                    storage_opts,
102                )
103                .await?,
104            ),
105        })
106    }
107
108    pub(crate) fn insert(&mut self, vec: Vector, info: Bytes) -> HummockResult<()> {
109        match self {
110            VectorWriterImpl::Flat(writer) => writer.insert(vec, info),
111            VectorWriterImpl::HnswFlat(writer) => writer.insert(vec, info),
112        }
113    }
114
115    pub(crate) fn seal_current_epoch(&mut self) -> Option<VectorIndexAdd> {
116        match self {
117            VectorWriterImpl::Flat(writer) => writer.seal_current_epoch(),
118            VectorWriterImpl::HnswFlat(writer) => {
119                writer.seal_current_epoch().map(VectorIndexAdd::HnswFlat)
120            }
121        }
122    }
123
124    pub(crate) async fn flush(&mut self) -> HummockResult<usize> {
125        match self {
126            VectorWriterImpl::Flat(writer) => writer.flush().await,
127            VectorWriterImpl::HnswFlat(writer) => writer.flush().await,
128        }
129    }
130
131    pub(crate) async fn try_flush(&mut self) -> HummockResult<()> {
132        match self {
133            VectorWriterImpl::Flat(writer) => writer.try_flush().await,
134            VectorWriterImpl::HnswFlat(writer) => writer.try_flush().await,
135        }
136    }
137}
138
139pub(crate) struct FlatIndexWriter {
140    flushed_vector_files: Vec<VectorFileInfo>,
141    sstable_store: SstableStoreRef,
142    vector_file_builder: VectorFileBuilder,
143}
144
145impl FlatIndexWriter {
146    pub(crate) fn new(
147        index: &FlatIndex,
148        dimension: usize,
149        sstable_store: SstableStoreRef,
150        object_id_manager: VectorObjectIdManagerRef,
151        storage_opts: &StorageOpts,
152    ) -> Self {
153        Self {
154            flushed_vector_files: vec![],
155            sstable_store: sstable_store.clone(),
156            vector_file_builder: new_vector_file_builder(
157                dimension,
158                index.vector_store_info.next_vector_id,
159                sstable_store,
160                object_id_manager,
161                storage_opts,
162            ),
163        }
164    }
165
166    pub(crate) fn insert(&mut self, vec: Vector, info: Bytes) -> HummockResult<()> {
167        self.vector_file_builder.add(vec.to_ref(), info.as_ref());
168        Ok(())
169    }
170
171    pub(crate) fn seal_current_epoch(&mut self) -> Option<VectorIndexAdd> {
172        assert!(self.vector_file_builder.is_empty());
173        if self.flushed_vector_files.is_empty() {
174            return None;
175        }
176        Some(VectorIndexAdd::Flat(FlatIndexAdd {
177            vector_store_info_delta: VectorStoreInfoDelta {
178                next_vector_id: self.vector_file_builder.next_vector_id(),
179                added_vector_files: take(&mut self.flushed_vector_files),
180            },
181        }))
182    }
183
184    pub(crate) async fn flush(&mut self) -> HummockResult<usize> {
185        if let Some((file_info, blocks, meta)) = self.vector_file_builder.finish().await? {
186            self.sstable_store
187                .insert_vector_cache(file_info.object_id, meta, blocks);
188            let size = file_info.file_size as _;
189            self.flushed_vector_files.push(file_info);
190            Ok(size)
191        } else {
192            Ok(0)
193        }
194    }
195
196    pub(crate) async fn try_flush(&mut self) -> HummockResult<()> {
197        self.vector_file_builder.try_flush().await
198    }
199}