risingwave_storage/hummock/store/
vector_writer.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use bytes::Bytes;
18use risingwave_common::catalog::TableId;
19use risingwave_common::util::epoch::EpochPair;
20use risingwave_hummock_sdk::HummockEpoch;
21
22use crate::error::StorageResult;
23use crate::hummock::event_handler::HummockEvent;
24use crate::hummock::event_handler::hummock_event_handler::HummockEventSender;
25use crate::hummock::local_version::pinned_version::PinnedVersion;
26use crate::hummock::utils::wait_for_epoch;
27use crate::hummock::vector::writer::VectorWriterImpl;
28use crate::hummock::{HummockError, HummockResult, ObjectIdManagerRef, SstableStoreRef};
29use crate::opts::StorageOpts;
30use crate::store::*;
31
32struct VectorWriterInitGuard {
33    table_id: TableId,
34    hummock_event_sender: HummockEventSender,
35}
36
37impl VectorWriterInitGuard {
38    fn new(
39        table_id: TableId,
40        init_epoch: HummockEpoch,
41        hummock_event_sender: HummockEventSender,
42    ) -> HummockResult<Self> {
43        hummock_event_sender
44            .send(HummockEvent::RegisterVectorWriter {
45                table_id,
46                init_epoch,
47            })
48            .map_err(|_| HummockError::other("failed to send register vector writer event"))?;
49        Ok(Self {
50            table_id,
51            hummock_event_sender,
52        })
53    }
54}
55
56impl Drop for VectorWriterInitGuard {
57    fn drop(&mut self) {
58        let _ = self
59            .hummock_event_sender
60            .send(HummockEvent::DropVectorWriter {
61                table_id: self.table_id,
62            });
63    }
64}
65
66struct VectorWriterState {
67    epoch: EpochPair,
68    writer_impl: VectorWriterImpl,
69    _guard: VectorWriterInitGuard,
70}
71
72pub struct HummockVectorWriter {
73    table_id: TableId,
74    version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
75    sstable_store: SstableStoreRef,
76    object_id_manager: ObjectIdManagerRef,
77    hummock_event_sender: HummockEventSender,
78    storage_opts: Arc<StorageOpts>,
79
80    state: Option<VectorWriterState>,
81}
82
83impl HummockVectorWriter {
84    pub(super) fn new(
85        table_id: TableId,
86        version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
87        sstable_store: SstableStoreRef,
88        object_id_manager: ObjectIdManagerRef,
89        hummock_event_sender: HummockEventSender,
90        storage_opts: Arc<StorageOpts>,
91    ) -> Self {
92        Self {
93            table_id,
94            version_update_notifier_tx,
95            sstable_store,
96            object_id_manager,
97            hummock_event_sender,
98            storage_opts,
99            state: None,
100        }
101    }
102}
103
104impl StateStoreWriteEpochControl for HummockVectorWriter {
105    async fn init(&mut self, opts: InitOptions) -> StorageResult<()> {
106        let version = wait_for_epoch(
107            &self.version_update_notifier_tx,
108            opts.epoch.prev,
109            self.table_id,
110        )
111        .await?;
112        let index = &version.vector_indexes.get(&self.table_id).ok_or_else(|| {
113            HummockError::other(format!("vector index not found: {}", self.table_id))
114        })?;
115        assert!(
116            self.state
117                .replace(VectorWriterState {
118                    epoch: opts.epoch,
119                    writer_impl: VectorWriterImpl::new(
120                        index,
121                        self.sstable_store.clone(),
122                        self.object_id_manager.clone(),
123                        &self.storage_opts,
124                    ),
125                    _guard: VectorWriterInitGuard::new(
126                        self.table_id,
127                        opts.epoch.curr,
128                        self.hummock_event_sender.clone()
129                    )?,
130                })
131                .is_none()
132        );
133        Ok(())
134    }
135
136    fn seal_current_epoch(&mut self, next_epoch: u64, _opts: SealCurrentEpochOptions) {
137        let state = self.state.as_mut().expect("should have init");
138        let epoch = &mut state.epoch;
139        assert!(next_epoch > epoch.curr);
140        epoch.prev = epoch.curr;
141        epoch.curr = next_epoch;
142        let _ = self
143            .hummock_event_sender
144            .send(HummockEvent::VectorWriterSealEpoch {
145                table_id: self.table_id,
146                next_epoch,
147                add: state.writer_impl.seal_current_epoch(),
148            });
149    }
150
151    async fn flush(&mut self) -> StorageResult<usize> {
152        Ok(self
153            .state
154            .as_mut()
155            .expect("should have init")
156            .writer_impl
157            .flush()
158            .await?)
159    }
160
161    async fn try_flush(&mut self) -> StorageResult<()> {
162        Ok(self
163            .state
164            .as_mut()
165            .expect("should have init")
166            .writer_impl
167            .try_flush()
168            .await?)
169    }
170}
171
172impl StateStoreWriteVector for HummockVectorWriter {
173    fn insert(&mut self, vec: Vector, info: Bytes) -> StorageResult<()> {
174        Ok(self
175            .state
176            .as_mut()
177            .expect("should have init")
178            .writer_impl
179            .insert(vec, info)?)
180    }
181}