risingwave_storage/hummock/vector/writer/
mod.rs1mod hnsw;
16use std::mem::take;
17use std::sync::Arc;
18
19use bytes::Bytes;
20use futures::FutureExt;
21use hnsw::HnswFlatIndexWriter;
22use risingwave_common::vector::distance::DistanceMeasurement;
23use risingwave_hummock_sdk::vector_index::{
24 FlatIndex, FlatIndexAdd, VectorFileInfo, VectorIndex, VectorIndexAdd, VectorIndexImpl,
25 VectorStoreInfoDelta,
26};
27use risingwave_hummock_sdk::{HummockObjectId, HummockRawObjectId};
28
29use crate::hummock::vector::file::VectorFileBuilder;
30use crate::hummock::{HummockResult, ObjectIdManager, SstableStoreRef};
31use crate::opts::StorageOpts;
32use crate::vector::Vector;
33
34#[async_trait::async_trait]
35pub trait VectorObjectIdManager: Send + Sync {
36 async fn get_new_vector_object_id(&self) -> HummockResult<HummockRawObjectId>;
37}
38
39pub type VectorObjectIdManagerRef = Arc<dyn VectorObjectIdManager>;
40
41#[async_trait::async_trait]
42impl VectorObjectIdManager for ObjectIdManager {
43 async fn get_new_vector_object_id(&self) -> HummockResult<HummockRawObjectId> {
44 self.get_new_object_id().await
45 }
46}
47
48pub(crate) fn new_vector_file_builder(
49 dimension: usize,
50 next_vector_id: usize,
51 sstable_store: SstableStoreRef,
52 object_id_manager: VectorObjectIdManagerRef,
53 storage_opts: &StorageOpts,
54) -> VectorFileBuilder {
55 VectorFileBuilder::new(
56 dimension,
57 Box::new(move || {
58 let object_id_manager = object_id_manager.clone();
59 let sstable_store = sstable_store.clone();
60 async move {
61 let object_id = object_id_manager.get_new_vector_object_id().await?.into();
62 let path =
63 sstable_store.get_object_data_path(HummockObjectId::VectorFile(object_id));
64 let uploader = sstable_store.create_streaming_uploader(&path).await?;
65 Ok((object_id, uploader))
66 }
67 .boxed()
68 }),
69 next_vector_id,
70 storage_opts.vector_file_block_size_kb * 1024,
71 )
72}
73
74pub(crate) enum VectorWriterImpl {
75 Flat(FlatIndexWriter),
76 HnswFlat(HnswFlatIndexWriter),
77}
78
79impl VectorWriterImpl {
80 pub(crate) async fn new(
81 index: &VectorIndex,
82 sstable_store: SstableStoreRef,
83 object_id_manager: VectorObjectIdManagerRef,
84 storage_opts: &StorageOpts,
85 ) -> HummockResult<Self> {
86 Ok(match &index.inner {
87 VectorIndexImpl::Flat(flat) => VectorWriterImpl::Flat(FlatIndexWriter::new(
88 flat,
89 index.dimension,
90 sstable_store,
91 object_id_manager,
92 storage_opts,
93 )),
94 VectorIndexImpl::HnswFlat(hnsw_flat) => VectorWriterImpl::HnswFlat(
95 HnswFlatIndexWriter::new(
96 hnsw_flat,
97 index.dimension,
98 DistanceMeasurement::from(index.distance_type),
99 sstable_store,
100 object_id_manager,
101 storage_opts,
102 )
103 .await?,
104 ),
105 })
106 }
107
108 pub(crate) fn insert(&mut self, vec: Vector, info: Bytes) -> HummockResult<()> {
109 match self {
110 VectorWriterImpl::Flat(writer) => writer.insert(vec, info),
111 VectorWriterImpl::HnswFlat(writer) => writer.insert(vec, info),
112 }
113 }
114
115 pub(crate) fn seal_current_epoch(&mut self) -> Option<VectorIndexAdd> {
116 match self {
117 VectorWriterImpl::Flat(writer) => writer.seal_current_epoch(),
118 VectorWriterImpl::HnswFlat(writer) => {
119 writer.seal_current_epoch().map(VectorIndexAdd::HnswFlat)
120 }
121 }
122 }
123
124 pub(crate) async fn flush(&mut self) -> HummockResult<usize> {
125 match self {
126 VectorWriterImpl::Flat(writer) => writer.flush().await,
127 VectorWriterImpl::HnswFlat(writer) => writer.flush().await,
128 }
129 }
130
131 pub(crate) async fn try_flush(&mut self) -> HummockResult<()> {
132 match self {
133 VectorWriterImpl::Flat(writer) => writer.try_flush().await,
134 VectorWriterImpl::HnswFlat(writer) => writer.try_flush().await,
135 }
136 }
137}
138
139pub(crate) struct FlatIndexWriter {
140 flushed_vector_files: Vec<VectorFileInfo>,
141 sstable_store: SstableStoreRef,
142 vector_file_builder: VectorFileBuilder,
143}
144
145impl FlatIndexWriter {
146 pub(crate) fn new(
147 index: &FlatIndex,
148 dimension: usize,
149 sstable_store: SstableStoreRef,
150 object_id_manager: VectorObjectIdManagerRef,
151 storage_opts: &StorageOpts,
152 ) -> Self {
153 Self {
154 flushed_vector_files: vec![],
155 sstable_store: sstable_store.clone(),
156 vector_file_builder: new_vector_file_builder(
157 dimension,
158 index.vector_store_info.next_vector_id,
159 sstable_store,
160 object_id_manager,
161 storage_opts,
162 ),
163 }
164 }
165
166 pub(crate) fn insert(&mut self, vec: Vector, info: Bytes) -> HummockResult<()> {
167 self.vector_file_builder.add(vec.to_ref(), info.as_ref());
168 Ok(())
169 }
170
171 pub(crate) fn seal_current_epoch(&mut self) -> Option<VectorIndexAdd> {
172 assert!(self.vector_file_builder.is_empty());
173 if self.flushed_vector_files.is_empty() {
174 return None;
175 }
176 Some(VectorIndexAdd::Flat(FlatIndexAdd {
177 vector_store_info_delta: VectorStoreInfoDelta {
178 next_vector_id: self.vector_file_builder.next_vector_id(),
179 added_vector_files: take(&mut self.flushed_vector_files),
180 },
181 }))
182 }
183
184 pub(crate) async fn flush(&mut self) -> HummockResult<usize> {
185 if let Some((file_info, blocks, meta)) = self.vector_file_builder.finish().await? {
186 self.sstable_store
187 .insert_vector_cache(file_info.object_id, meta, blocks);
188 let size = file_info.file_size as _;
189 self.flushed_vector_files.push(file_info);
190 Ok(size)
191 } else {
192 Ok(0)
193 }
194 }
195
196 pub(crate) async fn try_flush(&mut self) -> HummockResult<()> {
197 self.vector_file_builder.try_flush().await
198 }
199}