risingwave_storage/hummock/vector/writer/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod hnsw;
16use std::mem::take;
17use std::sync::Arc;
18
19use bytes::Bytes;
20use futures::FutureExt;
21use hnsw::HnswFlatIndexWriter;
22use risingwave_common::array::VectorRef;
23use risingwave_common::catalog::TableId;
24use risingwave_common::vector::distance::DistanceMeasurement;
25use risingwave_hummock_sdk::vector_index::{
26    FlatIndex, FlatIndexAdd, VectorFileInfo, VectorIndex, VectorIndexAdd, VectorIndexImpl,
27    VectorStoreInfoDelta,
28};
29use risingwave_hummock_sdk::{HummockObjectId, HummockRawObjectId};
30
31use crate::hummock::vector::file::VectorFileBuilder;
32use crate::hummock::{HummockResult, ObjectIdManager, SstableStoreRef};
33use crate::monitor::HummockStateStoreMetrics;
34use crate::opts::StorageOpts;
35
36#[async_trait::async_trait]
37pub trait VectorObjectIdManager: Send + Sync {
38    async fn get_new_vector_object_id(&self) -> HummockResult<HummockRawObjectId>;
39}
40
41pub type VectorObjectIdManagerRef = Arc<dyn VectorObjectIdManager>;
42
43#[async_trait::async_trait]
44impl VectorObjectIdManager for ObjectIdManager {
45    async fn get_new_vector_object_id(&self) -> HummockResult<HummockRawObjectId> {
46        self.get_new_object_id().await
47    }
48}
49
50pub(crate) fn new_vector_file_builder(
51    dimension: usize,
52    next_vector_id: usize,
53    sstable_store: SstableStoreRef,
54    object_id_manager: VectorObjectIdManagerRef,
55    storage_opts: &StorageOpts,
56) -> VectorFileBuilder {
57    VectorFileBuilder::new(
58        dimension,
59        Box::new(move || {
60            let object_id_manager = object_id_manager.clone();
61            let sstable_store = sstable_store.clone();
62            async move {
63                let object_id = object_id_manager.get_new_vector_object_id().await?.into();
64                let path =
65                    sstable_store.get_object_data_path(HummockObjectId::VectorFile(object_id));
66                let uploader = sstable_store.create_streaming_uploader(&path).await?;
67                Ok((object_id, uploader))
68            }
69            .boxed()
70        }),
71        next_vector_id,
72        storage_opts.vector_file_block_size_kb * 1024,
73    )
74}
75
76pub(crate) enum VectorWriterImpl {
77    Flat(FlatIndexWriter),
78    HnswFlat(HnswFlatIndexWriter),
79}
80
81impl VectorWriterImpl {
82    pub(crate) async fn new(
83        table_id: TableId,
84        index: &VectorIndex,
85        sstable_store: SstableStoreRef,
86        object_id_manager: VectorObjectIdManagerRef,
87        stats: Arc<HummockStateStoreMetrics>,
88        storage_opts: &StorageOpts,
89    ) -> HummockResult<Self> {
90        Ok(match &index.inner {
91            VectorIndexImpl::Flat(flat) => VectorWriterImpl::Flat(FlatIndexWriter::new(
92                flat,
93                index.dimension,
94                sstable_store,
95                object_id_manager,
96                storage_opts,
97            )),
98            VectorIndexImpl::HnswFlat(hnsw_flat) => VectorWriterImpl::HnswFlat(
99                HnswFlatIndexWriter::new(
100                    table_id,
101                    hnsw_flat,
102                    index.dimension,
103                    DistanceMeasurement::from(index.distance_type),
104                    sstable_store,
105                    object_id_manager,
106                    stats,
107                    storage_opts,
108                )
109                .await?,
110            ),
111        })
112    }
113
114    pub(crate) fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> HummockResult<()> {
115        match self {
116            VectorWriterImpl::Flat(writer) => writer.insert(vec, info),
117            VectorWriterImpl::HnswFlat(writer) => writer.insert(vec, info),
118        }
119    }
120
121    pub(crate) fn seal_current_epoch(&mut self) -> Option<VectorIndexAdd> {
122        match self {
123            VectorWriterImpl::Flat(writer) => writer.seal_current_epoch(),
124            VectorWriterImpl::HnswFlat(writer) => {
125                writer.seal_current_epoch().map(VectorIndexAdd::HnswFlat)
126            }
127        }
128    }
129
130    pub(crate) async fn flush(&mut self) -> HummockResult<usize> {
131        match self {
132            VectorWriterImpl::Flat(writer) => writer.flush().await,
133            VectorWriterImpl::HnswFlat(writer) => writer.flush().await,
134        }
135    }
136
137    pub(crate) async fn try_flush(&mut self) -> HummockResult<()> {
138        match self {
139            VectorWriterImpl::Flat(writer) => writer.try_flush().await,
140            VectorWriterImpl::HnswFlat(writer) => writer.try_flush().await,
141        }
142    }
143}
144
145pub(crate) struct FlatIndexWriter {
146    flushed_vector_files: Vec<VectorFileInfo>,
147    sstable_store: SstableStoreRef,
148    vector_file_builder: VectorFileBuilder,
149}
150
151impl FlatIndexWriter {
152    pub(crate) fn new(
153        index: &FlatIndex,
154        dimension: usize,
155        sstable_store: SstableStoreRef,
156        object_id_manager: VectorObjectIdManagerRef,
157        storage_opts: &StorageOpts,
158    ) -> Self {
159        Self {
160            flushed_vector_files: vec![],
161            sstable_store: sstable_store.clone(),
162            vector_file_builder: new_vector_file_builder(
163                dimension,
164                index.vector_store_info.next_vector_id,
165                sstable_store,
166                object_id_manager,
167                storage_opts,
168            ),
169        }
170    }
171
172    pub(crate) fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> HummockResult<()> {
173        self.vector_file_builder.add(vec, info.as_ref());
174        Ok(())
175    }
176
177    pub(crate) fn seal_current_epoch(&mut self) -> Option<VectorIndexAdd> {
178        assert!(self.vector_file_builder.is_empty());
179        if self.flushed_vector_files.is_empty() {
180            return None;
181        }
182        Some(VectorIndexAdd::Flat(FlatIndexAdd {
183            vector_store_info_delta: VectorStoreInfoDelta {
184                next_vector_id: self.vector_file_builder.next_vector_id(),
185                added_vector_files: take(&mut self.flushed_vector_files),
186            },
187        }))
188    }
189
190    pub(crate) async fn flush(&mut self) -> HummockResult<usize> {
191        if let Some((file_info, blocks, meta)) = self.vector_file_builder.finish().await? {
192            self.sstable_store
193                .insert_vector_cache(file_info.object_id, meta, blocks);
194            let size = file_info.file_size as _;
195            self.flushed_vector_files.push(file_info);
196            Ok(size)
197        } else {
198            Ok(0)
199        }
200    }
201
202    pub(crate) async fn try_flush(&mut self) -> HummockResult<()> {
203        self.vector_file_builder.try_flush().await
204    }
205}