risingwave_storage/hummock/vector/writer/
hnsw.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::mem::take;
16use std::sync::Arc;
17
18use bytes::{Bytes, BytesMut};
19use prost::Message;
20use rand::SeedableRng;
21use rand::rngs::StdRng;
22use risingwave_common::array::VectorRef;
23use risingwave_common::catalog::TableId;
24use risingwave_common::dispatch_distance_measurement;
25use risingwave_common::metrics::LabelGuardedIntGauge;
26use risingwave_common::vector::distance::DistanceMeasurement;
27use risingwave_hummock_sdk::HummockObjectId;
28use risingwave_hummock_sdk::vector_index::{
29    HnswFlatIndex, HnswFlatIndexAdd, HnswGraphFileInfo, VectorFileInfo, VectorStoreInfoDelta,
30};
31use risingwave_pb::hummock::PbHnswGraph;
32
33use crate::hummock::vector::file::{FileVectorStore, FileVectorStoreCtx};
34use crate::hummock::vector::monitor::report_hnsw_stat;
35use crate::hummock::vector::writer::VectorObjectIdManagerRef;
36use crate::hummock::{HummockResult, SstableStoreRef};
37use crate::monitor::HummockStateStoreMetrics;
38use crate::opts::StorageOpts;
39use crate::vector::hnsw::{
40    HnswBuilderOptions, HnswGraphBuilder, VectorAccessor, insert_graph, new_node,
41};
42
43pub(crate) struct HnswFlatIndexWriter {
44    measure: DistanceMeasurement,
45    options: HnswBuilderOptions,
46    sstable_store: SstableStoreRef,
47    object_id_manager: VectorObjectIdManagerRef,
48    stats: HnswFlatIndexWriterStats,
49
50    vector_store: FileVectorStore,
51    ctx: FileVectorStoreCtx,
52    next_pending_vector_id: usize,
53    graph_builder: Option<HnswGraphBuilder>,
54    unseal_vector_files: Vec<VectorFileInfo>,
55    flushed_graph_file: Option<HnswGraphFileInfo>,
56    rng: StdRng,
57}
58
59impl HnswFlatIndexWriter {
60    pub(crate) async fn new(
61        table_id: TableId,
62        index: &HnswFlatIndex,
63        dimension: usize,
64        measure: DistanceMeasurement,
65        sstable_store: SstableStoreRef,
66        object_id_manager: VectorObjectIdManagerRef,
67        stats: Arc<HummockStateStoreMetrics>,
68        storage_opts: &StorageOpts,
69    ) -> HummockResult<Self> {
70        let stats = HnswFlatIndexWriterStats::new(table_id, stats);
71        let mut ctx = FileVectorStoreCtx::default();
72        let graph_builder = if let Some(graph_file) = &index.graph_file {
73            stats.hnsw_file_size.set(graph_file.file_size as _);
74            Some(HnswGraphBuilder::from_protobuf(
75                &*sstable_store
76                    .get_hnsw_graph(graph_file, &mut ctx.stats)
77                    .await?,
78                index.config.m as _,
79            ))
80        } else {
81            None
82        };
83
84        let mut writer = Self {
85            measure,
86            options: HnswBuilderOptions {
87                m: index.config.m.try_into().unwrap(),
88                ef_construction: index.config.ef_construction.try_into().unwrap(),
89                max_level: index.config.max_level.try_into().unwrap(),
90            },
91            vector_store: FileVectorStore::new_for_writer(
92                index,
93                dimension,
94                sstable_store.clone(),
95                object_id_manager.clone(),
96                storage_opts,
97            ),
98            ctx,
99            sstable_store,
100            object_id_manager,
101            graph_builder,
102            unseal_vector_files: vec![],
103            flushed_graph_file: None,
104            rng: StdRng::from_os_rng(),
105            next_pending_vector_id: index.vector_store_info.next_vector_id,
106            stats,
107        };
108
109        writer.report_index_stats();
110
111        Ok(writer)
112    }
113
114    pub(crate) fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> HummockResult<()> {
115        self.vector_store
116            .building_vectors
117            .as_mut()
118            .expect("for write")
119            .file_builder
120            .add(vec, &info);
121        Ok(())
122    }
123
124    pub(crate) fn seal_current_epoch(&mut self) -> Option<HnswFlatIndexAdd> {
125        let building_vectors = &mut self
126            .vector_store
127            .building_vectors
128            .as_mut()
129            .expect("for write");
130        assert!(building_vectors.file_builder.is_empty());
131        let added_vector_files = take(&mut self.unseal_vector_files);
132        if added_vector_files.is_empty() {
133            assert_eq!(self.flushed_graph_file, None);
134            return None;
135        }
136        let next_vector_id = building_vectors.file_builder.next_vector_id();
137        self.report_index_stats();
138        let new_graph_info = self
139            .flushed_graph_file
140            .take()
141            .expect("should have new graph info when having new data");
142        Some(HnswFlatIndexAdd {
143            vector_store_info_delta: VectorStoreInfoDelta {
144                next_vector_id,
145                added_vector_files,
146            },
147            graph_file: new_graph_info,
148        })
149    }
150
151    pub(crate) async fn flush(&mut self) -> HummockResult<usize> {
152        self.add_pending_vectors_to_graph().await?;
153        let new_file = self.vector_store.flush().await?;
154        if let Some(new_file) = new_file {
155            let graph_builder = self
156                .graph_builder
157                .as_ref()
158                .expect("builder should exist when having newly flushed data");
159            let pb_graph = graph_builder.to_protobuf();
160            let mut buffer = BytesMut::with_capacity(pb_graph.encoded_len());
161            PbHnswGraph::encode(&pb_graph, &mut buffer).unwrap();
162            let encoded_graph = buffer.freeze();
163            let size = encoded_graph.len();
164            let object_id = self
165                .object_id_manager
166                .get_new_vector_object_id()
167                .await?
168                .into();
169            let path = self
170                .sstable_store
171                .get_object_data_path(HummockObjectId::HnswGraphFile(object_id));
172            self.sstable_store
173                .store()
174                .upload(&path, encoded_graph)
175                .await?;
176            self.sstable_store
177                .insert_hnsw_graph_cache(object_id, pb_graph);
178            self.flushed_graph_file = Some(HnswGraphFileInfo {
179                object_id,
180                file_size: size as _,
181            });
182            let file_size = new_file.file_size as _;
183            self.unseal_vector_files.push(new_file);
184            Ok(file_size)
185        } else {
186            Ok(0)
187        }
188    }
189
190    pub(crate) async fn try_flush(&mut self) -> HummockResult<()> {
191        self.vector_store
192            .building_vectors
193            .as_mut()
194            .expect("for write")
195            .file_builder
196            .try_flush()
197            .await?;
198        self.add_pending_vectors_to_graph().await
199    }
200
201    async fn add_pending_vectors_to_graph(&mut self) -> HummockResult<()> {
202        let building_vectors = self
203            .vector_store
204            .building_vectors
205            .as_ref()
206            .expect("for write");
207        let mut stats = Vec::with_capacity(
208            building_vectors.file_builder.next_vector_id() - self.next_pending_vector_id,
209        );
210        for i in self.next_pending_vector_id..building_vectors.file_builder.next_vector_id() {
211            let node = new_node(&self.options, &mut self.rng);
212            if let Some(graph_builder) = &mut self.graph_builder {
213                dispatch_distance_measurement!(&self.measure, M, {
214                    let stat = insert_graph::<M, _>(
215                        &self.vector_store,
216                        &mut self.ctx,
217                        graph_builder,
218                        node,
219                        building_vectors.file_builder.get_vector(i).vec_ref(),
220                        self.options.ef_construction,
221                    )
222                    .await?;
223                    stats.push(stat);
224                });
225            } else {
226                self.graph_builder = Some(HnswGraphBuilder::first(node));
227            }
228        }
229        take(&mut self.ctx.stats).report(self.stats.table_id, "hnsw_write", &self.stats.stats);
230        report_hnsw_stat(
231            &self.stats.stats,
232            self.stats.table_id,
233            "hnsw_write",
234            self.options.m,
235            self.options.ef_construction,
236            stats,
237        );
238        self.next_pending_vector_id = building_vectors.file_builder.next_vector_id();
239        Ok(())
240    }
241
242    fn report_index_stats(&mut self) {
243        if let Some(graph) = &self.graph_builder {
244            for new_level_idx in self.stats.level_node_count.len()..graph.level_node_count().len() {
245                self.stats
246                    .level_node_count
247                    .push(self.stats.new_level_node_gauge(new_level_idx));
248            }
249            for (level_idx, node_count) in graph.level_node_count().iter().enumerate() {
250                self.stats.level_node_count[level_idx].set(*node_count as _);
251            }
252            let total_vector_file_data_size: usize = self
253                .vector_store
254                .flushed_vector_files()
255                .iter()
256                .map(|file| file.file_size as usize)
257                .sum();
258            let total_vector_file_meta_size: usize = self
259                .vector_store
260                .flushed_vector_files()
261                .iter()
262                .map(|file| file.file_size as usize - file.meta_offset)
263                .sum();
264            self.stats
265                .vector_file_count
266                .set(self.vector_store.flushed_vector_files().len() as _);
267            self.stats
268                .vector_file_data_size
269                .set(total_vector_file_data_size as _);
270            self.stats
271                .vector_file_meta_size
272                .set(total_vector_file_meta_size as _);
273            if let Some(graph_info) = &self.flushed_graph_file {
274                self.stats.hnsw_file_size.set(graph_info.file_size as _);
275            }
276        }
277    }
278}
279
280struct HnswFlatIndexWriterStats {
281    table_id: TableId,
282    stats: Arc<HummockStateStoreMetrics>,
283    level_node_count: Vec<LabelGuardedIntGauge>,
284    vector_file_count: LabelGuardedIntGauge,
285    vector_file_data_size: LabelGuardedIntGauge,
286    vector_file_meta_size: LabelGuardedIntGauge,
287    hnsw_file_size: LabelGuardedIntGauge,
288}
289
290impl HnswFlatIndexWriterStats {
291    fn new_level_node_gauge(&self, level_idx: usize) -> LabelGuardedIntGauge {
292        let table_id_label = format!("{}", self.table_id);
293        self.stats
294            .vector_hnsw_graph_level_node_count
295            .with_guarded_label_values(&[
296                table_id_label.as_str(),
297                format!("{}", level_idx).as_str(),
298            ])
299    }
300
301    fn new(table_id: TableId, stats: Arc<HummockStateStoreMetrics>) -> Self {
302        let table_id_label = format!("{}", table_id);
303        let vector_file_count = stats
304            .vector_index_file_count
305            .with_guarded_label_values(&[table_id_label.as_str()]);
306        let vector_file_data_size = stats
307            .vector_index_file_size
308            .with_guarded_label_values(&[table_id_label.as_str(), "vector_file_data"]);
309        let vector_file_meta_size = stats
310            .vector_index_file_size
311            .with_guarded_label_values(&[table_id_label.as_str(), "vector_file_meta"]);
312        let hnsw_file_size = stats
313            .vector_index_file_size
314            .with_guarded_label_values(&[table_id_label.as_str(), "hnsw_graph_file"]);
315
316        Self {
317            table_id,
318            stats,
319            level_node_count: vec![],
320            vector_file_count,
321            vector_file_data_size,
322            vector_file_meta_size,
323            hnsw_file_size,
324        }
325    }
326}