risingwave_storage/hummock/vector/writer/
hnsw.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::mem::take;
16
17use bytes::{Bytes, BytesMut};
18use prost::Message;
19use rand::SeedableRng;
20use rand::rngs::StdRng;
21use risingwave_common::dispatch_distance_measurement;
22use risingwave_common::vector::distance::DistanceMeasurement;
23use risingwave_hummock_sdk::HummockObjectId;
24use risingwave_hummock_sdk::vector_index::{
25    HnswFlatIndex, HnswFlatIndexAdd, HnswGraphFileInfo, VectorFileInfo, VectorStoreInfoDelta,
26};
27use risingwave_pb::hummock::PbHnswGraph;
28
29use crate::hummock::vector::file::FileVectorStore;
30use crate::hummock::vector::writer::VectorObjectIdManagerRef;
31use crate::hummock::{HummockResult, SstableStoreRef};
32use crate::opts::StorageOpts;
33use crate::store::Vector;
34use crate::vector::hnsw::{
35    HnswBuilderOptions, HnswGraphBuilder, VectorAccessor, insert_graph, new_node,
36};
37
38pub(crate) struct HnswFlatIndexWriter {
39    measure: DistanceMeasurement,
40    options: HnswBuilderOptions,
41    sstable_store: SstableStoreRef,
42    object_id_manager: VectorObjectIdManagerRef,
43
44    vector_store: FileVectorStore,
45    next_pending_vector_id: usize,
46    graph_builder: Option<HnswGraphBuilder>,
47    unseal_vector_files: Vec<VectorFileInfo>,
48    flushed_graph_file: Option<HnswGraphFileInfo>,
49    rng: StdRng,
50}
51
52impl HnswFlatIndexWriter {
53    pub(crate) async fn new(
54        index: &HnswFlatIndex,
55        dimension: usize,
56        measure: DistanceMeasurement,
57        sstable_store: SstableStoreRef,
58        object_id_manager: VectorObjectIdManagerRef,
59        storage_opts: &StorageOpts,
60    ) -> HummockResult<Self> {
61        let graph_builder = if let Some(graph_file) = &index.graph_file {
62            Some(HnswGraphBuilder::from_protobuf(
63                &*sstable_store.get_hnsw_graph(graph_file).await?,
64                index.config.m as _,
65            ))
66        } else {
67            None
68        };
69        Ok(Self {
70            measure,
71            options: HnswBuilderOptions {
72                m: index.config.m.try_into().unwrap(),
73                ef_construction: index.config.ef_construction.try_into().unwrap(),
74                max_level: index.config.max_level.try_into().unwrap(),
75            },
76            vector_store: FileVectorStore::new_for_writer(
77                index,
78                dimension,
79                sstable_store.clone(),
80                object_id_manager.clone(),
81                storage_opts,
82            ),
83            sstable_store,
84            object_id_manager,
85            graph_builder,
86            unseal_vector_files: vec![],
87            flushed_graph_file: None,
88            rng: StdRng::from_os_rng(),
89            next_pending_vector_id: index.vector_store_info.next_vector_id,
90        })
91    }
92
93    pub(crate) fn insert(&mut self, vec: Vector, info: Bytes) -> HummockResult<()> {
94        self.vector_store
95            .building_vectors
96            .as_mut()
97            .expect("for write")
98            .file_builder
99            .add(vec.to_ref(), &info);
100        Ok(())
101    }
102
103    pub(crate) fn seal_current_epoch(&mut self) -> Option<HnswFlatIndexAdd> {
104        let building_vectors = &mut self
105            .vector_store
106            .building_vectors
107            .as_mut()
108            .expect("for write");
109        assert!(building_vectors.file_builder.is_empty());
110        let added_vector_files = take(&mut self.unseal_vector_files);
111        if added_vector_files.is_empty() {
112            assert_eq!(self.flushed_graph_file, None);
113            return None;
114        }
115        let new_graph_info = self
116            .flushed_graph_file
117            .take()
118            .expect("should have new graph info when having new data");
119        Some(HnswFlatIndexAdd {
120            vector_store_info_delta: VectorStoreInfoDelta {
121                next_vector_id: building_vectors.file_builder.next_vector_id(),
122                added_vector_files,
123            },
124            graph_file: new_graph_info,
125        })
126    }
127
128    pub(crate) async fn flush(&mut self) -> HummockResult<usize> {
129        self.add_pending_vectors_to_graph().await?;
130        let new_file = self.vector_store.flush().await?;
131        if let Some(new_file) = new_file {
132            let graph_builder = self
133                .graph_builder
134                .as_ref()
135                .expect("builder should exist when having newly flushed data");
136            let pb_graph = graph_builder.to_protobuf();
137            let mut buffer = BytesMut::with_capacity(pb_graph.encoded_len());
138            PbHnswGraph::encode(&pb_graph, &mut buffer).unwrap();
139            let encoded_graph = buffer.freeze();
140            let size = encoded_graph.len();
141            let object_id = self
142                .object_id_manager
143                .get_new_vector_object_id()
144                .await?
145                .into();
146            let path = self
147                .sstable_store
148                .get_object_data_path(HummockObjectId::HnswGraphFile(object_id));
149            self.sstable_store
150                .store()
151                .upload(&path, encoded_graph)
152                .await?;
153            self.sstable_store
154                .insert_hnsw_graph_cache(object_id, pb_graph);
155            self.flushed_graph_file = Some(HnswGraphFileInfo {
156                object_id,
157                file_size: size as _,
158            });
159            let file_size = new_file.file_size as _;
160            self.unseal_vector_files.push(new_file);
161            Ok(file_size)
162        } else {
163            Ok(0)
164        }
165    }
166
167    pub(crate) async fn try_flush(&mut self) -> HummockResult<()> {
168        self.vector_store
169            .building_vectors
170            .as_mut()
171            .expect("for write")
172            .file_builder
173            .try_flush()
174            .await?;
175        self.add_pending_vectors_to_graph().await
176    }
177
178    async fn add_pending_vectors_to_graph(&mut self) -> HummockResult<()> {
179        let building_vectors = self
180            .vector_store
181            .building_vectors
182            .as_ref()
183            .expect("for write");
184        for i in self.next_pending_vector_id..building_vectors.file_builder.next_vector_id() {
185            let node = new_node(&self.options, &mut self.rng);
186            if let Some(graph_builder) = &mut self.graph_builder {
187                dispatch_distance_measurement!(&self.measure, M, {
188                    insert_graph::<M>(
189                        &self.vector_store,
190                        graph_builder,
191                        node,
192                        building_vectors.file_builder.get_vector(i).vec_ref(),
193                        self.options.ef_construction,
194                    )
195                    .await?;
196                });
197            } else {
198                self.graph_builder = Some(HnswGraphBuilder::first(node));
199            }
200        }
201        self.next_pending_vector_id = building_vectors.file_builder.next_vector_id();
202        Ok(())
203    }
204}