risingwave_storage/hummock/vector/writer/
mod.rs1use std::mem::take;
16use std::sync::Arc;
17
18use bytes::Bytes;
19use futures::FutureExt;
20use risingwave_hummock_sdk::vector_index::{
21 FlatIndexAdd, VectorFileInfo, VectorIndex, VectorIndexAdd, VectorIndexImpl,
22 VectorStoreInfoDelta,
23};
24use risingwave_hummock_sdk::{HummockObjectId, HummockRawObjectId};
25
26use crate::hummock::vector::file::VectorFileBuilder;
27use crate::hummock::{HummockResult, ObjectIdManager, ObjectIdManagerRef, SstableStoreRef};
28use crate::opts::StorageOpts;
29use crate::vector::Vector;
30
31#[async_trait::async_trait]
32pub trait VectorObjectIdManager: Send + Sync {
33 async fn get_new_vector_object_id(&self) -> HummockResult<HummockRawObjectId>;
34}
35
36pub type VectorObjectIdManagerRef = Arc<dyn VectorObjectIdManager>;
37
38#[async_trait::async_trait]
39impl VectorObjectIdManager for ObjectIdManager {
40 async fn get_new_vector_object_id(&self) -> HummockResult<HummockRawObjectId> {
41 self.get_new_object_id().await
42 }
43}
44
45pub(crate) fn new_vector_file_builder(
46 dimension: usize,
47 next_vector_id: usize,
48 sstable_store: SstableStoreRef,
49 object_id_manager: VectorObjectIdManagerRef,
50 storage_opts: &StorageOpts,
51) -> VectorFileBuilder {
52 VectorFileBuilder::new(
53 dimension,
54 Box::new(move || {
55 let object_id_manager = object_id_manager.clone();
56 let sstable_store = sstable_store.clone();
57 async move {
58 let object_id = object_id_manager.get_new_vector_object_id().await?.into();
59 let path =
60 sstable_store.get_object_data_path(HummockObjectId::VectorFile(object_id));
61 let uploader = sstable_store.create_streaming_uploader(&path).await?;
62 Ok((object_id, uploader))
63 }
64 .boxed()
65 }),
66 next_vector_id,
67 storage_opts.vector_file_block_size_kb * 1024,
68 )
69}
70
71pub(crate) struct VectorWriterImpl {
72 flushed_vector_files: Vec<VectorFileInfo>,
73 sstable_store: SstableStoreRef,
74 vector_file_builder: VectorFileBuilder,
75}
76
77impl VectorWriterImpl {
78 pub(crate) fn new(
79 index: &VectorIndex,
80 sstable_store: SstableStoreRef,
81 object_id_manager: ObjectIdManagerRef,
82 storage_opts: &StorageOpts,
83 ) -> Self {
84 let VectorIndexImpl::Flat(flat_index) = &index.inner;
85 Self {
86 flushed_vector_files: vec![],
87 sstable_store: sstable_store.clone(),
88 vector_file_builder: new_vector_file_builder(
89 index.dimension,
90 flat_index.vector_store_info.next_vector_id,
91 sstable_store,
92 object_id_manager,
93 storage_opts,
94 ),
95 }
96 }
97
98 pub(crate) fn insert(&mut self, vec: Vector, info: Bytes) -> HummockResult<()> {
99 self.vector_file_builder.add(vec.to_ref(), info.as_ref());
100 Ok(())
101 }
102
103 pub(crate) fn seal_current_epoch(&mut self) -> Option<VectorIndexAdd> {
104 assert!(self.vector_file_builder.is_empty());
105 if self.flushed_vector_files.is_empty() {
106 return None;
107 }
108 Some(VectorIndexAdd::Flat(FlatIndexAdd {
109 vector_store_info_delta: VectorStoreInfoDelta {
110 next_vector_id: self.vector_file_builder.next_vector_id(),
111 added_vector_files: take(&mut self.flushed_vector_files),
112 },
113 }))
114 }
115
116 pub(crate) async fn flush(&mut self) -> HummockResult<usize> {
117 if let Some((file_info, blocks, meta)) = self.vector_file_builder.finish().await? {
118 self.sstable_store
119 .insert_vector_cache(file_info.object_id, meta, blocks);
120 let size = file_info.file_size as _;
121 self.flushed_vector_files.push(file_info);
122 Ok(size)
123 } else {
124 Ok(0)
125 }
126 }
127
128 pub(crate) async fn try_flush(&mut self) -> HummockResult<()> {
129 self.vector_file_builder.try_flush().await
130 }
131}