risingwave_storage/hummock/vector/writer/
mod.rs1mod hnsw;
16use std::mem::take;
17use std::sync::Arc;
18
19use bytes::Bytes;
20use futures::FutureExt;
21use hnsw::HnswFlatIndexWriter;
22use risingwave_common::array::VectorRef;
23use risingwave_common::catalog::TableId;
24use risingwave_common::vector::distance::DistanceMeasurement;
25use risingwave_hummock_sdk::vector_index::{
26 FlatIndex, FlatIndexAdd, VectorFileInfo, VectorIndex, VectorIndexAdd, VectorIndexImpl,
27 VectorStoreInfoDelta,
28};
29use risingwave_hummock_sdk::{HummockObjectId, HummockRawObjectId};
30
31use crate::hummock::vector::file::VectorFileBuilder;
32use crate::hummock::{HummockResult, ObjectIdManager, SstableStoreRef};
33use crate::monitor::HummockStateStoreMetrics;
34use crate::opts::StorageOpts;
35
36#[async_trait::async_trait]
37pub trait VectorObjectIdManager: Send + Sync {
38 async fn get_new_vector_object_id(&self) -> HummockResult<HummockRawObjectId>;
39}
40
41pub type VectorObjectIdManagerRef = Arc<dyn VectorObjectIdManager>;
42
43#[async_trait::async_trait]
44impl VectorObjectIdManager for ObjectIdManager {
45 async fn get_new_vector_object_id(&self) -> HummockResult<HummockRawObjectId> {
46 self.get_new_object_id().await
47 }
48}
49
50pub(crate) fn new_vector_file_builder(
51 dimension: usize,
52 next_vector_id: usize,
53 sstable_store: SstableStoreRef,
54 object_id_manager: VectorObjectIdManagerRef,
55 storage_opts: &StorageOpts,
56) -> VectorFileBuilder {
57 VectorFileBuilder::new(
58 dimension,
59 Box::new(move || {
60 let object_id_manager = object_id_manager.clone();
61 let sstable_store = sstable_store.clone();
62 async move {
63 let object_id = object_id_manager.get_new_vector_object_id().await?.into();
64 let path =
65 sstable_store.get_object_data_path(HummockObjectId::VectorFile(object_id));
66 let uploader = sstable_store.create_streaming_uploader(&path).await?;
67 Ok((object_id, uploader))
68 }
69 .boxed()
70 }),
71 next_vector_id,
72 storage_opts.vector_file_block_size_kb * 1024,
73 )
74}
75
76pub(crate) enum VectorWriterImpl {
77 Flat(FlatIndexWriter),
78 HnswFlat(HnswFlatIndexWriter),
79}
80
81impl VectorWriterImpl {
82 pub(crate) async fn new(
83 table_id: TableId,
84 index: &VectorIndex,
85 sstable_store: SstableStoreRef,
86 object_id_manager: VectorObjectIdManagerRef,
87 stats: Arc<HummockStateStoreMetrics>,
88 storage_opts: &StorageOpts,
89 ) -> HummockResult<Self> {
90 Ok(match &index.inner {
91 VectorIndexImpl::Flat(flat) => VectorWriterImpl::Flat(FlatIndexWriter::new(
92 flat,
93 index.dimension,
94 sstable_store,
95 object_id_manager,
96 storage_opts,
97 )),
98 VectorIndexImpl::HnswFlat(hnsw_flat) => VectorWriterImpl::HnswFlat(
99 HnswFlatIndexWriter::new(
100 table_id,
101 hnsw_flat,
102 index.dimension,
103 DistanceMeasurement::from(index.distance_type),
104 sstable_store,
105 object_id_manager,
106 stats,
107 storage_opts,
108 )
109 .await?,
110 ),
111 })
112 }
113
114 pub(crate) fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> HummockResult<()> {
115 match self {
116 VectorWriterImpl::Flat(writer) => writer.insert(vec, info),
117 VectorWriterImpl::HnswFlat(writer) => writer.insert(vec, info),
118 }
119 }
120
121 pub(crate) fn seal_current_epoch(&mut self) -> Option<VectorIndexAdd> {
122 match self {
123 VectorWriterImpl::Flat(writer) => writer.seal_current_epoch(),
124 VectorWriterImpl::HnswFlat(writer) => {
125 writer.seal_current_epoch().map(VectorIndexAdd::HnswFlat)
126 }
127 }
128 }
129
130 pub(crate) async fn flush(&mut self) -> HummockResult<usize> {
131 match self {
132 VectorWriterImpl::Flat(writer) => writer.flush().await,
133 VectorWriterImpl::HnswFlat(writer) => writer.flush().await,
134 }
135 }
136
137 pub(crate) async fn try_flush(&mut self) -> HummockResult<()> {
138 match self {
139 VectorWriterImpl::Flat(writer) => writer.try_flush().await,
140 VectorWriterImpl::HnswFlat(writer) => writer.try_flush().await,
141 }
142 }
143}
144
145pub(crate) struct FlatIndexWriter {
146 flushed_vector_files: Vec<VectorFileInfo>,
147 sstable_store: SstableStoreRef,
148 vector_file_builder: VectorFileBuilder,
149}
150
151impl FlatIndexWriter {
152 pub(crate) fn new(
153 index: &FlatIndex,
154 dimension: usize,
155 sstable_store: SstableStoreRef,
156 object_id_manager: VectorObjectIdManagerRef,
157 storage_opts: &StorageOpts,
158 ) -> Self {
159 Self {
160 flushed_vector_files: vec![],
161 sstable_store: sstable_store.clone(),
162 vector_file_builder: new_vector_file_builder(
163 dimension,
164 index.vector_store_info.next_vector_id,
165 sstable_store,
166 object_id_manager,
167 storage_opts,
168 ),
169 }
170 }
171
172 pub(crate) fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> HummockResult<()> {
173 self.vector_file_builder.add(vec, info.as_ref());
174 Ok(())
175 }
176
177 pub(crate) fn seal_current_epoch(&mut self) -> Option<VectorIndexAdd> {
178 assert!(self.vector_file_builder.is_empty());
179 if self.flushed_vector_files.is_empty() {
180 return None;
181 }
182 Some(VectorIndexAdd::Flat(FlatIndexAdd {
183 vector_store_info_delta: VectorStoreInfoDelta {
184 next_vector_id: self.vector_file_builder.next_vector_id(),
185 added_vector_files: take(&mut self.flushed_vector_files),
186 },
187 }))
188 }
189
190 pub(crate) async fn flush(&mut self) -> HummockResult<usize> {
191 if let Some((file_info, blocks, meta)) = self.vector_file_builder.finish().await? {
192 self.sstable_store
193 .insert_vector_cache(file_info.object_id, meta, blocks);
194 let size = file_info.file_size as _;
195 self.flushed_vector_files.push(file_info);
196 Ok(size)
197 } else {
198 Ok(0)
199 }
200 }
201
202 pub(crate) async fn try_flush(&mut self) -> HummockResult<()> {
203 self.vector_file_builder.try_flush().await
204 }
205}