risingwave_storage/hummock/vector/writer/
hnsw.rs1use std::mem::take;
16
17use bytes::{Bytes, BytesMut};
18use prost::Message;
19use rand::SeedableRng;
20use rand::rngs::StdRng;
21use risingwave_common::dispatch_distance_measurement;
22use risingwave_common::vector::distance::DistanceMeasurement;
23use risingwave_hummock_sdk::HummockObjectId;
24use risingwave_hummock_sdk::vector_index::{
25 HnswFlatIndex, HnswFlatIndexAdd, HnswGraphFileInfo, VectorFileInfo, VectorStoreInfoDelta,
26};
27use risingwave_pb::hummock::PbHnswGraph;
28
29use crate::hummock::vector::file::FileVectorStore;
30use crate::hummock::vector::writer::VectorObjectIdManagerRef;
31use crate::hummock::{HummockResult, SstableStoreRef};
32use crate::opts::StorageOpts;
33use crate::store::Vector;
34use crate::vector::hnsw::{
35 HnswBuilderOptions, HnswGraphBuilder, VectorAccessor, insert_graph, new_node,
36};
37
38pub(crate) struct HnswFlatIndexWriter {
39 measure: DistanceMeasurement,
40 options: HnswBuilderOptions,
41 sstable_store: SstableStoreRef,
42 object_id_manager: VectorObjectIdManagerRef,
43
44 vector_store: FileVectorStore,
45 next_pending_vector_id: usize,
46 graph_builder: Option<HnswGraphBuilder>,
47 unseal_vector_files: Vec<VectorFileInfo>,
48 flushed_graph_file: Option<HnswGraphFileInfo>,
49 rng: StdRng,
50}
51
52impl HnswFlatIndexWriter {
53 pub(crate) async fn new(
54 index: &HnswFlatIndex,
55 dimension: usize,
56 measure: DistanceMeasurement,
57 sstable_store: SstableStoreRef,
58 object_id_manager: VectorObjectIdManagerRef,
59 storage_opts: &StorageOpts,
60 ) -> HummockResult<Self> {
61 let graph_builder = if let Some(graph_file) = &index.graph_file {
62 Some(HnswGraphBuilder::from_protobuf(
63 &*sstable_store.get_hnsw_graph(graph_file).await?,
64 index.config.m as _,
65 ))
66 } else {
67 None
68 };
69 Ok(Self {
70 measure,
71 options: HnswBuilderOptions {
72 m: index.config.m.try_into().unwrap(),
73 ef_construction: index.config.ef_construction.try_into().unwrap(),
74 max_level: index.config.max_level.try_into().unwrap(),
75 },
76 vector_store: FileVectorStore::new_for_writer(
77 index,
78 dimension,
79 sstable_store.clone(),
80 object_id_manager.clone(),
81 storage_opts,
82 ),
83 sstable_store,
84 object_id_manager,
85 graph_builder,
86 unseal_vector_files: vec![],
87 flushed_graph_file: None,
88 rng: StdRng::from_os_rng(),
89 next_pending_vector_id: index.vector_store_info.next_vector_id,
90 })
91 }
92
93 pub(crate) fn insert(&mut self, vec: Vector, info: Bytes) -> HummockResult<()> {
94 self.vector_store
95 .building_vectors
96 .as_mut()
97 .expect("for write")
98 .file_builder
99 .add(vec.to_ref(), &info);
100 Ok(())
101 }
102
103 pub(crate) fn seal_current_epoch(&mut self) -> Option<HnswFlatIndexAdd> {
104 let building_vectors = &mut self
105 .vector_store
106 .building_vectors
107 .as_mut()
108 .expect("for write");
109 assert!(building_vectors.file_builder.is_empty());
110 let added_vector_files = take(&mut self.unseal_vector_files);
111 if added_vector_files.is_empty() {
112 assert_eq!(self.flushed_graph_file, None);
113 return None;
114 }
115 let new_graph_info = self
116 .flushed_graph_file
117 .take()
118 .expect("should have new graph info when having new data");
119 Some(HnswFlatIndexAdd {
120 vector_store_info_delta: VectorStoreInfoDelta {
121 next_vector_id: building_vectors.file_builder.next_vector_id(),
122 added_vector_files,
123 },
124 graph_file: new_graph_info,
125 })
126 }
127
128 pub(crate) async fn flush(&mut self) -> HummockResult<usize> {
129 self.add_pending_vectors_to_graph().await?;
130 let new_file = self.vector_store.flush().await?;
131 if let Some(new_file) = new_file {
132 let graph_builder = self
133 .graph_builder
134 .as_ref()
135 .expect("builder should exist when having newly flushed data");
136 let pb_graph = graph_builder.to_protobuf();
137 let mut buffer = BytesMut::with_capacity(pb_graph.encoded_len());
138 PbHnswGraph::encode(&pb_graph, &mut buffer).unwrap();
139 let encoded_graph = buffer.freeze();
140 let size = encoded_graph.len();
141 let object_id = self
142 .object_id_manager
143 .get_new_vector_object_id()
144 .await?
145 .into();
146 let path = self
147 .sstable_store
148 .get_object_data_path(HummockObjectId::HnswGraphFile(object_id));
149 self.sstable_store
150 .store()
151 .upload(&path, encoded_graph)
152 .await?;
153 self.sstable_store
154 .insert_hnsw_graph_cache(object_id, pb_graph);
155 self.flushed_graph_file = Some(HnswGraphFileInfo {
156 object_id,
157 file_size: size as _,
158 });
159 let file_size = new_file.file_size as _;
160 self.unseal_vector_files.push(new_file);
161 Ok(file_size)
162 } else {
163 Ok(0)
164 }
165 }
166
167 pub(crate) async fn try_flush(&mut self) -> HummockResult<()> {
168 self.vector_store
169 .building_vectors
170 .as_mut()
171 .expect("for write")
172 .file_builder
173 .try_flush()
174 .await?;
175 self.add_pending_vectors_to_graph().await
176 }
177
178 async fn add_pending_vectors_to_graph(&mut self) -> HummockResult<()> {
179 let building_vectors = self
180 .vector_store
181 .building_vectors
182 .as_ref()
183 .expect("for write");
184 for i in self.next_pending_vector_id..building_vectors.file_builder.next_vector_id() {
185 let node = new_node(&self.options, &mut self.rng);
186 if let Some(graph_builder) = &mut self.graph_builder {
187 dispatch_distance_measurement!(&self.measure, M, {
188 insert_graph::<M>(
189 &self.vector_store,
190 graph_builder,
191 node,
192 building_vectors.file_builder.get_vector(i).vec_ref(),
193 self.options.ef_construction,
194 )
195 .await?;
196 });
197 } else {
198 self.graph_builder = Some(HnswGraphBuilder::first(node));
199 }
200 }
201 self.next_pending_vector_id = building_vectors.file_builder.next_vector_id();
202 Ok(())
203 }
204}