risingwave_storage/hummock/vector/writer/
hnsw.rs1use std::mem::take;
16use std::sync::Arc;
17
18use bytes::{Bytes, BytesMut};
19use prost::Message;
20use rand::SeedableRng;
21use rand::rngs::StdRng;
22use risingwave_common::array::VectorRef;
23use risingwave_common::catalog::TableId;
24use risingwave_common::dispatch_distance_measurement;
25use risingwave_common::metrics::LabelGuardedIntGauge;
26use risingwave_common::vector::distance::DistanceMeasurement;
27use risingwave_hummock_sdk::HummockObjectId;
28use risingwave_hummock_sdk::vector_index::{
29 HnswFlatIndex, HnswFlatIndexAdd, HnswGraphFileInfo, VectorFileInfo, VectorStoreInfoDelta,
30};
31use risingwave_pb::hummock::PbHnswGraph;
32
33use crate::hummock::vector::file::{FileVectorStore, FileVectorStoreCtx};
34use crate::hummock::vector::monitor::report_hnsw_stat;
35use crate::hummock::vector::writer::VectorObjectIdManagerRef;
36use crate::hummock::{HummockResult, SstableStoreRef};
37use crate::monitor::HummockStateStoreMetrics;
38use crate::opts::StorageOpts;
39use crate::vector::hnsw::{
40 HnswBuilderOptions, HnswGraphBuilder, VectorAccessor, insert_graph, new_node,
41};
42
43pub(crate) struct HnswFlatIndexWriter {
44 measure: DistanceMeasurement,
45 options: HnswBuilderOptions,
46 sstable_store: SstableStoreRef,
47 object_id_manager: VectorObjectIdManagerRef,
48 stats: HnswFlatIndexWriterStats,
49
50 vector_store: FileVectorStore,
51 ctx: FileVectorStoreCtx,
52 next_pending_vector_id: usize,
53 graph_builder: Option<HnswGraphBuilder>,
54 unseal_vector_files: Vec<VectorFileInfo>,
55 flushed_graph_file: Option<HnswGraphFileInfo>,
56 rng: StdRng,
57}
58
59impl HnswFlatIndexWriter {
60 pub(crate) async fn new(
61 table_id: TableId,
62 index: &HnswFlatIndex,
63 dimension: usize,
64 measure: DistanceMeasurement,
65 sstable_store: SstableStoreRef,
66 object_id_manager: VectorObjectIdManagerRef,
67 stats: Arc<HummockStateStoreMetrics>,
68 storage_opts: &StorageOpts,
69 ) -> HummockResult<Self> {
70 let stats = HnswFlatIndexWriterStats::new(table_id, stats);
71 let mut ctx = FileVectorStoreCtx::default();
72 let graph_builder = if let Some(graph_file) = &index.graph_file {
73 stats.hnsw_file_size.set(graph_file.file_size as _);
74 Some(HnswGraphBuilder::from_protobuf(
75 &*sstable_store
76 .get_hnsw_graph(graph_file, &mut ctx.stats)
77 .await?,
78 index.config.m as _,
79 ))
80 } else {
81 None
82 };
83
84 let mut writer = Self {
85 measure,
86 options: HnswBuilderOptions {
87 m: index.config.m.try_into().unwrap(),
88 ef_construction: index.config.ef_construction.try_into().unwrap(),
89 max_level: index.config.max_level.try_into().unwrap(),
90 },
91 vector_store: FileVectorStore::new_for_writer(
92 index,
93 dimension,
94 sstable_store.clone(),
95 object_id_manager.clone(),
96 storage_opts,
97 ),
98 ctx,
99 sstable_store,
100 object_id_manager,
101 graph_builder,
102 unseal_vector_files: vec![],
103 flushed_graph_file: None,
104 rng: StdRng::from_os_rng(),
105 next_pending_vector_id: index.vector_store_info.next_vector_id,
106 stats,
107 };
108
109 writer.report_index_stats();
110
111 Ok(writer)
112 }
113
114 pub(crate) fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> HummockResult<()> {
115 self.vector_store
116 .building_vectors
117 .as_mut()
118 .expect("for write")
119 .file_builder
120 .add(vec, &info);
121 Ok(())
122 }
123
124 pub(crate) fn seal_current_epoch(&mut self) -> Option<HnswFlatIndexAdd> {
125 let building_vectors = &mut self
126 .vector_store
127 .building_vectors
128 .as_mut()
129 .expect("for write");
130 assert!(building_vectors.file_builder.is_empty());
131 let added_vector_files = take(&mut self.unseal_vector_files);
132 if added_vector_files.is_empty() {
133 assert_eq!(self.flushed_graph_file, None);
134 return None;
135 }
136 let next_vector_id = building_vectors.file_builder.next_vector_id();
137 self.report_index_stats();
138 let new_graph_info = self
139 .flushed_graph_file
140 .take()
141 .expect("should have new graph info when having new data");
142 Some(HnswFlatIndexAdd {
143 vector_store_info_delta: VectorStoreInfoDelta {
144 next_vector_id,
145 added_vector_files,
146 },
147 graph_file: new_graph_info,
148 })
149 }
150
151 pub(crate) async fn flush(&mut self) -> HummockResult<usize> {
152 self.add_pending_vectors_to_graph().await?;
153 let new_file = self.vector_store.flush().await?;
154 if let Some(new_file) = new_file {
155 let graph_builder = self
156 .graph_builder
157 .as_ref()
158 .expect("builder should exist when having newly flushed data");
159 let pb_graph = graph_builder.to_protobuf();
160 let mut buffer = BytesMut::with_capacity(pb_graph.encoded_len());
161 PbHnswGraph::encode(&pb_graph, &mut buffer).unwrap();
162 let encoded_graph = buffer.freeze();
163 let size = encoded_graph.len();
164 let object_id = self
165 .object_id_manager
166 .get_new_vector_object_id()
167 .await?
168 .into();
169 let path = self
170 .sstable_store
171 .get_object_data_path(HummockObjectId::HnswGraphFile(object_id));
172 self.sstable_store
173 .store()
174 .upload(&path, encoded_graph)
175 .await?;
176 self.sstable_store
177 .insert_hnsw_graph_cache(object_id, pb_graph);
178 self.flushed_graph_file = Some(HnswGraphFileInfo {
179 object_id,
180 file_size: size as _,
181 });
182 let file_size = new_file.file_size as _;
183 self.unseal_vector_files.push(new_file);
184 Ok(file_size)
185 } else {
186 Ok(0)
187 }
188 }
189
190 pub(crate) async fn try_flush(&mut self) -> HummockResult<()> {
191 self.vector_store
192 .building_vectors
193 .as_mut()
194 .expect("for write")
195 .file_builder
196 .try_flush()
197 .await?;
198 self.add_pending_vectors_to_graph().await
199 }
200
201 async fn add_pending_vectors_to_graph(&mut self) -> HummockResult<()> {
202 let building_vectors = self
203 .vector_store
204 .building_vectors
205 .as_ref()
206 .expect("for write");
207 let mut stats = Vec::with_capacity(
208 building_vectors.file_builder.next_vector_id() - self.next_pending_vector_id,
209 );
210 for i in self.next_pending_vector_id..building_vectors.file_builder.next_vector_id() {
211 let node = new_node(&self.options, &mut self.rng);
212 if let Some(graph_builder) = &mut self.graph_builder {
213 dispatch_distance_measurement!(&self.measure, M, {
214 let stat = insert_graph::<M, _>(
215 &self.vector_store,
216 &mut self.ctx,
217 graph_builder,
218 node,
219 building_vectors.file_builder.get_vector(i).vec_ref(),
220 self.options.ef_construction,
221 )
222 .await?;
223 stats.push(stat);
224 });
225 } else {
226 self.graph_builder = Some(HnswGraphBuilder::first(node));
227 }
228 }
229 take(&mut self.ctx.stats).report(self.stats.table_id, "hnsw_write", &self.stats.stats);
230 report_hnsw_stat(
231 &self.stats.stats,
232 self.stats.table_id,
233 "hnsw_write",
234 self.options.m,
235 self.options.ef_construction,
236 stats,
237 );
238 self.next_pending_vector_id = building_vectors.file_builder.next_vector_id();
239 Ok(())
240 }
241
242 fn report_index_stats(&mut self) {
243 if let Some(graph) = &self.graph_builder {
244 for new_level_idx in self.stats.level_node_count.len()..graph.level_node_count().len() {
245 self.stats
246 .level_node_count
247 .push(self.stats.new_level_node_gauge(new_level_idx));
248 }
249 for (level_idx, node_count) in graph.level_node_count().iter().enumerate() {
250 self.stats.level_node_count[level_idx].set(*node_count as _);
251 }
252 let total_vector_file_data_size: usize = self
253 .vector_store
254 .flushed_vector_files()
255 .iter()
256 .map(|file| file.file_size as usize)
257 .sum();
258 let total_vector_file_meta_size: usize = self
259 .vector_store
260 .flushed_vector_files()
261 .iter()
262 .map(|file| file.file_size as usize - file.meta_offset)
263 .sum();
264 self.stats
265 .vector_file_count
266 .set(self.vector_store.flushed_vector_files().len() as _);
267 self.stats
268 .vector_file_data_size
269 .set(total_vector_file_data_size as _);
270 self.stats
271 .vector_file_meta_size
272 .set(total_vector_file_meta_size as _);
273 if let Some(graph_info) = &self.flushed_graph_file {
274 self.stats.hnsw_file_size.set(graph_info.file_size as _);
275 }
276 }
277 }
278}
279
280struct HnswFlatIndexWriterStats {
281 table_id: TableId,
282 stats: Arc<HummockStateStoreMetrics>,
283 level_node_count: Vec<LabelGuardedIntGauge>,
284 vector_file_count: LabelGuardedIntGauge,
285 vector_file_data_size: LabelGuardedIntGauge,
286 vector_file_meta_size: LabelGuardedIntGauge,
287 hnsw_file_size: LabelGuardedIntGauge,
288}
289
290impl HnswFlatIndexWriterStats {
291 fn new_level_node_gauge(&self, level_idx: usize) -> LabelGuardedIntGauge {
292 let table_id_label = format!("{}", self.table_id);
293 self.stats
294 .vector_hnsw_graph_level_node_count
295 .with_guarded_label_values(&[
296 table_id_label.as_str(),
297 format!("{}", level_idx).as_str(),
298 ])
299 }
300
301 fn new(table_id: TableId, stats: Arc<HummockStateStoreMetrics>) -> Self {
302 let table_id_label = format!("{}", table_id);
303 let vector_file_count = stats
304 .vector_index_file_count
305 .with_guarded_label_values(&[table_id_label.as_str()]);
306 let vector_file_data_size = stats
307 .vector_index_file_size
308 .with_guarded_label_values(&[table_id_label.as_str(), "vector_file_data"]);
309 let vector_file_meta_size = stats
310 .vector_index_file_size
311 .with_guarded_label_values(&[table_id_label.as_str(), "vector_file_meta"]);
312 let hnsw_file_size = stats
313 .vector_index_file_size
314 .with_guarded_label_values(&[table_id_label.as_str(), "hnsw_graph_file"]);
315
316 Self {
317 table_id,
318 stats,
319 level_node_count: vec![],
320 vector_file_count,
321 vector_file_data_size,
322 vector_file_meta_size,
323 hnsw_file_size,
324 }
325 }
326}