risingwave_storage/hummock/store/
vector_writer.rs1use std::sync::Arc;
16
17use bytes::Bytes;
18use risingwave_common::array::VectorRef;
19use risingwave_common::catalog::TableId;
20use risingwave_common::util::epoch::EpochPair;
21use risingwave_hummock_sdk::HummockEpoch;
22
23use crate::error::StorageResult;
24use crate::hummock::event_handler::HummockEvent;
25use crate::hummock::event_handler::hummock_event_handler::HummockEventSender;
26use crate::hummock::local_version::pinned_version::PinnedVersion;
27use crate::hummock::utils::wait_for_epoch;
28use crate::hummock::vector::writer::VectorWriterImpl;
29use crate::hummock::{HummockError, HummockResult, ObjectIdManagerRef, SstableStoreRef};
30use crate::opts::StorageOpts;
31use crate::store::*;
32
33struct VectorWriterInitGuard {
34 table_id: TableId,
35 hummock_event_sender: HummockEventSender,
36}
37
38impl VectorWriterInitGuard {
39 fn new(
40 table_id: TableId,
41 init_epoch: HummockEpoch,
42 hummock_event_sender: HummockEventSender,
43 ) -> HummockResult<Self> {
44 hummock_event_sender
45 .send(HummockEvent::RegisterVectorWriter {
46 table_id,
47 init_epoch,
48 })
49 .map_err(|_| HummockError::other("failed to send register vector writer event"))?;
50 Ok(Self {
51 table_id,
52 hummock_event_sender,
53 })
54 }
55}
56
57impl Drop for VectorWriterInitGuard {
58 fn drop(&mut self) {
59 let _ = self
60 .hummock_event_sender
61 .send(HummockEvent::DropVectorWriter {
62 table_id: self.table_id,
63 });
64 }
65}
66
67struct VectorWriterState {
68 epoch: EpochPair,
69 writer_impl: VectorWriterImpl,
70 _guard: VectorWriterInitGuard,
71}
72
73pub struct HummockVectorWriter {
74 table_id: TableId,
75 version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
76 sstable_store: SstableStoreRef,
77 object_id_manager: ObjectIdManagerRef,
78 hummock_event_sender: HummockEventSender,
79 storage_opts: Arc<StorageOpts>,
80
81 state: Option<VectorWriterState>,
82}
83
84impl HummockVectorWriter {
85 pub(super) fn new(
86 table_id: TableId,
87 version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
88 sstable_store: SstableStoreRef,
89 object_id_manager: ObjectIdManagerRef,
90 hummock_event_sender: HummockEventSender,
91 storage_opts: Arc<StorageOpts>,
92 ) -> Self {
93 Self {
94 table_id,
95 version_update_notifier_tx,
96 sstable_store,
97 object_id_manager,
98 hummock_event_sender,
99 storage_opts,
100 state: None,
101 }
102 }
103}
104
105impl StateStoreWriteEpochControl for HummockVectorWriter {
106 async fn init(&mut self, opts: InitOptions) -> StorageResult<()> {
107 let version = wait_for_epoch(
108 &self.version_update_notifier_tx,
109 opts.epoch.prev,
110 self.table_id,
111 )
112 .await?;
113 let index = &version.vector_indexes.get(&self.table_id).ok_or_else(|| {
114 HummockError::other(format!("vector index not found: {}", self.table_id))
115 })?;
116 assert!(
117 self.state
118 .replace(VectorWriterState {
119 epoch: opts.epoch,
120 writer_impl: VectorWriterImpl::new(
121 index,
122 self.sstable_store.clone(),
123 self.object_id_manager.clone(),
124 &self.storage_opts,
125 )
126 .await?,
127 _guard: VectorWriterInitGuard::new(
128 self.table_id,
129 opts.epoch.curr,
130 self.hummock_event_sender.clone()
131 )?,
132 })
133 .is_none()
134 );
135 Ok(())
136 }
137
138 fn seal_current_epoch(&mut self, next_epoch: u64, _opts: SealCurrentEpochOptions) {
139 let state = self.state.as_mut().expect("should have init");
140 let epoch = &mut state.epoch;
141 assert!(next_epoch > epoch.curr);
142 epoch.prev = epoch.curr;
143 epoch.curr = next_epoch;
144 let _ = self
145 .hummock_event_sender
146 .send(HummockEvent::VectorWriterSealEpoch {
147 table_id: self.table_id,
148 next_epoch,
149 add: state.writer_impl.seal_current_epoch(),
150 });
151 }
152
153 async fn flush(&mut self) -> StorageResult<usize> {
154 Ok(self
155 .state
156 .as_mut()
157 .expect("should have init")
158 .writer_impl
159 .flush()
160 .await?)
161 }
162
163 async fn try_flush(&mut self) -> StorageResult<()> {
164 Ok(self
165 .state
166 .as_mut()
167 .expect("should have init")
168 .writer_impl
169 .try_flush()
170 .await?)
171 }
172}
173
174impl StateStoreWriteVector for HummockVectorWriter {
175 fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> StorageResult<()> {
176 Ok(self
177 .state
178 .as_mut()
179 .expect("should have init")
180 .writer_impl
181 .insert(vec, info)?)
182 }
183}