risingwave_storage/hummock/store/
vector_writer.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use bytes::Bytes;
18use risingwave_common::array::VectorRef;
19use risingwave_common::catalog::TableId;
20use risingwave_common::util::epoch::EpochPair;
21use risingwave_hummock_sdk::HummockEpoch;
22
23use crate::error::StorageResult;
24use crate::hummock::event_handler::HummockEvent;
25use crate::hummock::event_handler::hummock_event_handler::HummockEventSender;
26use crate::hummock::local_version::pinned_version::PinnedVersion;
27use crate::hummock::utils::wait_for_epoch;
28use crate::hummock::vector::writer::VectorWriterImpl;
29use crate::hummock::{HummockError, HummockResult, ObjectIdManagerRef, SstableStoreRef};
30use crate::monitor::HummockStateStoreMetrics;
31use crate::opts::StorageOpts;
32use crate::store::*;
33
34struct VectorWriterInitGuard {
35    table_id: TableId,
36    hummock_event_sender: HummockEventSender,
37}
38
39impl VectorWriterInitGuard {
40    fn new(
41        table_id: TableId,
42        init_epoch: HummockEpoch,
43        hummock_event_sender: HummockEventSender,
44    ) -> HummockResult<Self> {
45        hummock_event_sender
46            .send(HummockEvent::RegisterVectorWriter {
47                table_id,
48                init_epoch,
49            })
50            .map_err(|_| HummockError::other("failed to send register vector writer event"))?;
51        Ok(Self {
52            table_id,
53            hummock_event_sender,
54        })
55    }
56}
57
58impl Drop for VectorWriterInitGuard {
59    fn drop(&mut self) {
60        let _ = self
61            .hummock_event_sender
62            .send(HummockEvent::DropVectorWriter {
63                table_id: self.table_id,
64            });
65    }
66}
67
68struct VectorWriterState {
69    epoch: EpochPair,
70    writer_impl: VectorWriterImpl,
71    _guard: VectorWriterInitGuard,
72}
73
74pub struct HummockVectorWriter {
75    table_id: TableId,
76    version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
77    sstable_store: SstableStoreRef,
78    object_id_manager: ObjectIdManagerRef,
79    hummock_event_sender: HummockEventSender,
80    stats: Arc<HummockStateStoreMetrics>,
81    storage_opts: Arc<StorageOpts>,
82
83    state: Option<VectorWriterState>,
84}
85
86impl HummockVectorWriter {
87    pub(super) fn new(
88        table_id: TableId,
89        version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
90        sstable_store: SstableStoreRef,
91        object_id_manager: ObjectIdManagerRef,
92        hummock_event_sender: HummockEventSender,
93        stats: Arc<HummockStateStoreMetrics>,
94        storage_opts: Arc<StorageOpts>,
95    ) -> Self {
96        Self {
97            table_id,
98            version_update_notifier_tx,
99            sstable_store,
100            object_id_manager,
101            hummock_event_sender,
102            stats,
103            storage_opts,
104            state: None,
105        }
106    }
107}
108
109impl StateStoreWriteEpochControl for HummockVectorWriter {
110    async fn init(&mut self, opts: InitOptions) -> StorageResult<()> {
111        let version = wait_for_epoch(
112            &self.version_update_notifier_tx,
113            opts.epoch.prev,
114            self.table_id,
115        )
116        .await?;
117        let index = &version.vector_indexes.get(&self.table_id).ok_or_else(|| {
118            HummockError::other(format!("vector index not found: {}", self.table_id))
119        })?;
120        assert!(
121            self.state
122                .replace(VectorWriterState {
123                    epoch: opts.epoch,
124                    writer_impl: VectorWriterImpl::new(
125                        self.table_id,
126                        index,
127                        self.sstable_store.clone(),
128                        self.object_id_manager.clone(),
129                        self.stats.clone(),
130                        &self.storage_opts,
131                    )
132                    .await?,
133                    _guard: VectorWriterInitGuard::new(
134                        self.table_id,
135                        opts.epoch.curr,
136                        self.hummock_event_sender.clone()
137                    )?,
138                })
139                .is_none()
140        );
141        Ok(())
142    }
143
144    fn seal_current_epoch(&mut self, next_epoch: u64, _opts: SealCurrentEpochOptions) {
145        let state = self.state.as_mut().expect("should have init");
146        let epoch = &mut state.epoch;
147        assert!(next_epoch > epoch.curr);
148        epoch.prev = epoch.curr;
149        epoch.curr = next_epoch;
150        let _ = self
151            .hummock_event_sender
152            .send(HummockEvent::VectorWriterSealEpoch {
153                table_id: self.table_id,
154                next_epoch,
155                add: state.writer_impl.seal_current_epoch(),
156            });
157    }
158
159    async fn flush(&mut self) -> StorageResult<usize> {
160        Ok(self
161            .state
162            .as_mut()
163            .expect("should have init")
164            .writer_impl
165            .flush()
166            .await?)
167    }
168
169    async fn try_flush(&mut self) -> StorageResult<()> {
170        Ok(self
171            .state
172            .as_mut()
173            .expect("should have init")
174            .writer_impl
175            .try_flush()
176            .await?)
177    }
178}
179
180impl StateStoreWriteVector for HummockVectorWriter {
181    fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> StorageResult<()> {
182        Ok(self
183            .state
184            .as_mut()
185            .expect("should have init")
186            .writer_impl
187            .insert(vec, info)?)
188    }
189}