risingwave_storage/hummock/store/
vector_writer.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use bytes::Bytes;
18use risingwave_common::array::VectorRef;
19use risingwave_common::catalog::TableId;
20use risingwave_common::util::epoch::EpochPair;
21use risingwave_hummock_sdk::HummockEpoch;
22
23use crate::error::StorageResult;
24use crate::hummock::event_handler::HummockEvent;
25use crate::hummock::event_handler::hummock_event_handler::HummockEventSender;
26use crate::hummock::local_version::pinned_version::PinnedVersion;
27use crate::hummock::utils::wait_for_epoch;
28use crate::hummock::vector::writer::VectorWriterImpl;
29use crate::hummock::{HummockError, HummockResult, ObjectIdManagerRef, SstableStoreRef};
30use crate::opts::StorageOpts;
31use crate::store::*;
32
33struct VectorWriterInitGuard {
34    table_id: TableId,
35    hummock_event_sender: HummockEventSender,
36}
37
38impl VectorWriterInitGuard {
39    fn new(
40        table_id: TableId,
41        init_epoch: HummockEpoch,
42        hummock_event_sender: HummockEventSender,
43    ) -> HummockResult<Self> {
44        hummock_event_sender
45            .send(HummockEvent::RegisterVectorWriter {
46                table_id,
47                init_epoch,
48            })
49            .map_err(|_| HummockError::other("failed to send register vector writer event"))?;
50        Ok(Self {
51            table_id,
52            hummock_event_sender,
53        })
54    }
55}
56
57impl Drop for VectorWriterInitGuard {
58    fn drop(&mut self) {
59        let _ = self
60            .hummock_event_sender
61            .send(HummockEvent::DropVectorWriter {
62                table_id: self.table_id,
63            });
64    }
65}
66
67struct VectorWriterState {
68    epoch: EpochPair,
69    writer_impl: VectorWriterImpl,
70    _guard: VectorWriterInitGuard,
71}
72
73pub struct HummockVectorWriter {
74    table_id: TableId,
75    version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
76    sstable_store: SstableStoreRef,
77    object_id_manager: ObjectIdManagerRef,
78    hummock_event_sender: HummockEventSender,
79    storage_opts: Arc<StorageOpts>,
80
81    state: Option<VectorWriterState>,
82}
83
84impl HummockVectorWriter {
85    pub(super) fn new(
86        table_id: TableId,
87        version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
88        sstable_store: SstableStoreRef,
89        object_id_manager: ObjectIdManagerRef,
90        hummock_event_sender: HummockEventSender,
91        storage_opts: Arc<StorageOpts>,
92    ) -> Self {
93        Self {
94            table_id,
95            version_update_notifier_tx,
96            sstable_store,
97            object_id_manager,
98            hummock_event_sender,
99            storage_opts,
100            state: None,
101        }
102    }
103}
104
105impl StateStoreWriteEpochControl for HummockVectorWriter {
106    async fn init(&mut self, opts: InitOptions) -> StorageResult<()> {
107        let version = wait_for_epoch(
108            &self.version_update_notifier_tx,
109            opts.epoch.prev,
110            self.table_id,
111        )
112        .await?;
113        let index = &version.vector_indexes.get(&self.table_id).ok_or_else(|| {
114            HummockError::other(format!("vector index not found: {}", self.table_id))
115        })?;
116        assert!(
117            self.state
118                .replace(VectorWriterState {
119                    epoch: opts.epoch,
120                    writer_impl: VectorWriterImpl::new(
121                        index,
122                        self.sstable_store.clone(),
123                        self.object_id_manager.clone(),
124                        &self.storage_opts,
125                    )
126                    .await?,
127                    _guard: VectorWriterInitGuard::new(
128                        self.table_id,
129                        opts.epoch.curr,
130                        self.hummock_event_sender.clone()
131                    )?,
132                })
133                .is_none()
134        );
135        Ok(())
136    }
137
138    fn seal_current_epoch(&mut self, next_epoch: u64, _opts: SealCurrentEpochOptions) {
139        let state = self.state.as_mut().expect("should have init");
140        let epoch = &mut state.epoch;
141        assert!(next_epoch > epoch.curr);
142        epoch.prev = epoch.curr;
143        epoch.curr = next_epoch;
144        let _ = self
145            .hummock_event_sender
146            .send(HummockEvent::VectorWriterSealEpoch {
147                table_id: self.table_id,
148                next_epoch,
149                add: state.writer_impl.seal_current_epoch(),
150            });
151    }
152
153    async fn flush(&mut self) -> StorageResult<usize> {
154        Ok(self
155            .state
156            .as_mut()
157            .expect("should have init")
158            .writer_impl
159            .flush()
160            .await?)
161    }
162
163    async fn try_flush(&mut self) -> StorageResult<()> {
164        Ok(self
165            .state
166            .as_mut()
167            .expect("should have init")
168            .writer_impl
169            .try_flush()
170            .await?)
171    }
172}
173
174impl StateStoreWriteVector for HummockVectorWriter {
175    fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> StorageResult<()> {
176        Ok(self
177            .state
178            .as_mut()
179            .expect("should have init")
180            .writer_impl
181            .insert(vec, info)?)
182    }
183}