risingwave_storage/hummock/store/
vector_writer.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use bytes::Bytes;
18use risingwave_common::catalog::TableId;
19use risingwave_common::util::epoch::EpochPair;
20use risingwave_hummock_sdk::HummockEpoch;
21
22use crate::error::StorageResult;
23use crate::hummock::event_handler::HummockEvent;
24use crate::hummock::event_handler::hummock_event_handler::HummockEventSender;
25use crate::hummock::local_version::pinned_version::PinnedVersion;
26use crate::hummock::utils::wait_for_epoch;
27use crate::hummock::vector::writer::VectorWriterImpl;
28use crate::hummock::{HummockError, HummockResult, ObjectIdManagerRef, SstableStoreRef};
29use crate::opts::StorageOpts;
30use crate::store::*;
31
32struct VectorWriterInitGuard {
33    table_id: TableId,
34    hummock_event_sender: HummockEventSender,
35}
36
37impl VectorWriterInitGuard {
38    fn new(
39        table_id: TableId,
40        init_epoch: HummockEpoch,
41        hummock_event_sender: HummockEventSender,
42    ) -> HummockResult<Self> {
43        hummock_event_sender
44            .send(HummockEvent::RegisterVectorWriter {
45                table_id,
46                init_epoch,
47            })
48            .map_err(|_| HummockError::other("failed to send register vector writer event"))?;
49        Ok(Self {
50            table_id,
51            hummock_event_sender,
52        })
53    }
54}
55
56impl Drop for VectorWriterInitGuard {
57    fn drop(&mut self) {
58        let _ = self
59            .hummock_event_sender
60            .send(HummockEvent::DropVectorWriter {
61                table_id: self.table_id,
62            });
63    }
64}
65
66struct VectorWriterState {
67    epoch: EpochPair,
68    writer_impl: VectorWriterImpl,
69    _guard: VectorWriterInitGuard,
70}
71
72pub struct HummockVectorWriter {
73    table_id: TableId,
74    version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
75    sstable_store: SstableStoreRef,
76    object_id_manager: ObjectIdManagerRef,
77    hummock_event_sender: HummockEventSender,
78    storage_opts: Arc<StorageOpts>,
79
80    state: Option<VectorWriterState>,
81}
82
83impl HummockVectorWriter {
84    pub(super) fn new(
85        table_id: TableId,
86        version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
87        sstable_store: SstableStoreRef,
88        object_id_manager: ObjectIdManagerRef,
89        hummock_event_sender: HummockEventSender,
90        storage_opts: Arc<StorageOpts>,
91    ) -> Self {
92        Self {
93            table_id,
94            version_update_notifier_tx,
95            sstable_store,
96            object_id_manager,
97            hummock_event_sender,
98            storage_opts,
99            state: None,
100        }
101    }
102}
103
104impl StateStoreWriteEpochControl for HummockVectorWriter {
105    async fn init(&mut self, opts: InitOptions) -> StorageResult<()> {
106        let version = wait_for_epoch(
107            &self.version_update_notifier_tx,
108            opts.epoch.prev,
109            self.table_id,
110        )
111        .await?;
112        let index = &version.vector_indexes.get(&self.table_id).ok_or_else(|| {
113            HummockError::other(format!("vector index not found: {}", self.table_id))
114        })?;
115        assert!(
116            self.state
117                .replace(VectorWriterState {
118                    epoch: opts.epoch,
119                    writer_impl: VectorWriterImpl::new(
120                        index,
121                        self.sstable_store.clone(),
122                        self.object_id_manager.clone(),
123                        &self.storage_opts,
124                    )
125                    .await?,
126                    _guard: VectorWriterInitGuard::new(
127                        self.table_id,
128                        opts.epoch.curr,
129                        self.hummock_event_sender.clone()
130                    )?,
131                })
132                .is_none()
133        );
134        Ok(())
135    }
136
137    fn seal_current_epoch(&mut self, next_epoch: u64, _opts: SealCurrentEpochOptions) {
138        let state = self.state.as_mut().expect("should have init");
139        let epoch = &mut state.epoch;
140        assert!(next_epoch > epoch.curr);
141        epoch.prev = epoch.curr;
142        epoch.curr = next_epoch;
143        let _ = self
144            .hummock_event_sender
145            .send(HummockEvent::VectorWriterSealEpoch {
146                table_id: self.table_id,
147                next_epoch,
148                add: state.writer_impl.seal_current_epoch(),
149            });
150    }
151
152    async fn flush(&mut self) -> StorageResult<usize> {
153        Ok(self
154            .state
155            .as_mut()
156            .expect("should have init")
157            .writer_impl
158            .flush()
159            .await?)
160    }
161
162    async fn try_flush(&mut self) -> StorageResult<()> {
163        Ok(self
164            .state
165            .as_mut()
166            .expect("should have init")
167            .writer_impl
168            .try_flush()
169            .await?)
170    }
171}
172
173impl StateStoreWriteVector for HummockVectorWriter {
174    fn insert(&mut self, vec: Vector, info: Bytes) -> StorageResult<()> {
175        Ok(self
176            .state
177            .as_mut()
178            .expect("should have init")
179            .writer_impl
180            .insert(vec, info)?)
181    }
182}