risingwave_storage/hummock/store/
vector_writer.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use bytes::Bytes;
18use risingwave_common::catalog::TableId;
19use risingwave_common::util::epoch::EpochPair;
20use risingwave_hummock_sdk::HummockEpoch;
21
22use crate::error::StorageResult;
23use crate::hummock::event_handler::HummockEvent;
24use crate::hummock::event_handler::hummock_event_handler::HummockEventSender;
25use crate::hummock::local_version::pinned_version::PinnedVersion;
26use crate::hummock::utils::wait_for_epoch;
27use crate::hummock::vector::writer::VectorWriterImpl;
28use crate::hummock::{HummockError, HummockResult, ObjectIdManagerRef, SstableStoreRef};
29use crate::opts::StorageOpts;
30use crate::store::*;
31
32struct VectorWriterInitGuard {
33    table_id: TableId,
34    hummock_event_sender: HummockEventSender,
35}
36
37impl VectorWriterInitGuard {
38    fn new(
39        table_id: TableId,
40        init_epoch: HummockEpoch,
41        hummock_event_sender: HummockEventSender,
42    ) -> HummockResult<Self> {
43        hummock_event_sender
44            .send(HummockEvent::RegisterVectorWriter {
45                table_id,
46                init_epoch,
47            })
48            .map_err(|_| HummockError::other("failed to send register vector writer event"))?;
49        Ok(Self {
50            table_id,
51            hummock_event_sender,
52        })
53    }
54}
55
56impl Drop for VectorWriterInitGuard {
57    fn drop(&mut self) {
58        let _ = self
59            .hummock_event_sender
60            .send(HummockEvent::DropVectorWriter {
61                table_id: self.table_id,
62            });
63    }
64}
65
66struct VectorWriterState {
67    epoch: EpochPair,
68    writer_impl: VectorWriterImpl,
69    _guard: VectorWriterInitGuard,
70}
71
72pub struct HummockVectorWriter {
73    table_id: TableId,
74    version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
75    sstable_store: SstableStoreRef,
76    object_id_manager: ObjectIdManagerRef,
77    hummock_event_sender: HummockEventSender,
78    storage_opts: Arc<StorageOpts>,
79
80    state: Option<VectorWriterState>,
81}
82
83impl HummockVectorWriter {
84    pub(super) fn new(
85        table_id: TableId,
86        version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
87        sstable_store: SstableStoreRef,
88        object_id_manager: ObjectIdManagerRef,
89        hummock_event_sender: HummockEventSender,
90        storage_opts: Arc<StorageOpts>,
91    ) -> Self {
92        Self {
93            table_id,
94            version_update_notifier_tx,
95            sstable_store,
96            object_id_manager,
97            hummock_event_sender,
98            storage_opts,
99            state: None,
100        }
101    }
102}
103
104impl StateStoreWriteEpochControl for HummockVectorWriter {
105    async fn init(&mut self, opts: InitOptions) -> StorageResult<()> {
106        let version = wait_for_epoch(
107            &self.version_update_notifier_tx,
108            opts.epoch.prev,
109            self.table_id,
110        )
111        .await?;
112        let index = &version.vector_indexes[&self.table_id];
113        assert!(
114            self.state
115                .replace(VectorWriterState {
116                    epoch: opts.epoch,
117                    writer_impl: VectorWriterImpl::new(
118                        index,
119                        self.sstable_store.clone(),
120                        self.object_id_manager.clone(),
121                        &self.storage_opts,
122                    ),
123                    _guard: VectorWriterInitGuard::new(
124                        self.table_id,
125                        opts.epoch.curr,
126                        self.hummock_event_sender.clone()
127                    )?,
128                })
129                .is_none()
130        );
131        Ok(())
132    }
133
134    fn seal_current_epoch(&mut self, next_epoch: u64, _opts: SealCurrentEpochOptions) {
135        let state = self.state.as_mut().expect("should have init");
136        let epoch = &mut state.epoch;
137        assert!(next_epoch > epoch.curr);
138        epoch.prev = epoch.curr;
139        epoch.curr = next_epoch;
140        let _ = self
141            .hummock_event_sender
142            .send(HummockEvent::VectorWriterSealEpoch {
143                table_id: self.table_id,
144                next_epoch,
145                add: state.writer_impl.seal_current_epoch(),
146            });
147    }
148
149    async fn flush(&mut self) -> StorageResult<usize> {
150        Ok(self
151            .state
152            .as_mut()
153            .expect("should have init")
154            .writer_impl
155            .flush()
156            .await?)
157    }
158
159    async fn try_flush(&mut self) -> StorageResult<()> {
160        Ok(self
161            .state
162            .as_mut()
163            .expect("should have init")
164            .writer_impl
165            .try_flush()
166            .await?)
167    }
168}
169
170impl StateStoreWriteVector for HummockVectorWriter {
171    fn insert(&mut self, vec: Vector, info: Bytes) -> StorageResult<()> {
172        Ok(self
173            .state
174            .as_mut()
175            .expect("should have init")
176            .writer_impl
177            .insert(vec, info)?)
178    }
179}