risingwave_storage/hummock/store/
vector_writer.rs1use std::sync::Arc;
16
17use bytes::Bytes;
18use risingwave_common::array::VectorRef;
19use risingwave_common::catalog::TableId;
20use risingwave_common::util::epoch::EpochPair;
21use risingwave_hummock_sdk::HummockEpoch;
22
23use crate::error::StorageResult;
24use crate::hummock::event_handler::HummockEvent;
25use crate::hummock::event_handler::hummock_event_handler::HummockEventSender;
26use crate::hummock::local_version::pinned_version::PinnedVersion;
27use crate::hummock::utils::wait_for_epoch;
28use crate::hummock::vector::writer::VectorWriterImpl;
29use crate::hummock::{HummockError, HummockResult, ObjectIdManagerRef, SstableStoreRef};
30use crate::monitor::HummockStateStoreMetrics;
31use crate::opts::StorageOpts;
32use crate::store::*;
33
34struct VectorWriterInitGuard {
35 table_id: TableId,
36 hummock_event_sender: HummockEventSender,
37}
38
39impl VectorWriterInitGuard {
40 fn new(
41 table_id: TableId,
42 init_epoch: HummockEpoch,
43 hummock_event_sender: HummockEventSender,
44 ) -> HummockResult<Self> {
45 hummock_event_sender
46 .send(HummockEvent::RegisterVectorWriter {
47 table_id,
48 init_epoch,
49 })
50 .map_err(|_| HummockError::other("failed to send register vector writer event"))?;
51 Ok(Self {
52 table_id,
53 hummock_event_sender,
54 })
55 }
56}
57
58impl Drop for VectorWriterInitGuard {
59 fn drop(&mut self) {
60 let _ = self
61 .hummock_event_sender
62 .send(HummockEvent::DropVectorWriter {
63 table_id: self.table_id,
64 });
65 }
66}
67
68struct VectorWriterState {
69 epoch: EpochPair,
70 writer_impl: VectorWriterImpl,
71 _guard: VectorWriterInitGuard,
72}
73
74pub struct HummockVectorWriter {
75 table_id: TableId,
76 version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
77 sstable_store: SstableStoreRef,
78 object_id_manager: ObjectIdManagerRef,
79 hummock_event_sender: HummockEventSender,
80 stats: Arc<HummockStateStoreMetrics>,
81 storage_opts: Arc<StorageOpts>,
82
83 state: Option<VectorWriterState>,
84}
85
86impl HummockVectorWriter {
87 pub(super) fn new(
88 table_id: TableId,
89 version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
90 sstable_store: SstableStoreRef,
91 object_id_manager: ObjectIdManagerRef,
92 hummock_event_sender: HummockEventSender,
93 stats: Arc<HummockStateStoreMetrics>,
94 storage_opts: Arc<StorageOpts>,
95 ) -> Self {
96 Self {
97 table_id,
98 version_update_notifier_tx,
99 sstable_store,
100 object_id_manager,
101 hummock_event_sender,
102 stats,
103 storage_opts,
104 state: None,
105 }
106 }
107}
108
109impl StateStoreWriteEpochControl for HummockVectorWriter {
110 async fn init(&mut self, opts: InitOptions) -> StorageResult<()> {
111 let version = wait_for_epoch(
112 &self.version_update_notifier_tx,
113 opts.epoch.prev,
114 self.table_id,
115 )
116 .await?;
117 let index = &version.vector_indexes.get(&self.table_id).ok_or_else(|| {
118 HummockError::other(format!("vector index not found: {}", self.table_id))
119 })?;
120 assert!(
121 self.state
122 .replace(VectorWriterState {
123 epoch: opts.epoch,
124 writer_impl: VectorWriterImpl::new(
125 self.table_id,
126 index,
127 self.sstable_store.clone(),
128 self.object_id_manager.clone(),
129 self.stats.clone(),
130 &self.storage_opts,
131 )
132 .await?,
133 _guard: VectorWriterInitGuard::new(
134 self.table_id,
135 opts.epoch.curr,
136 self.hummock_event_sender.clone()
137 )?,
138 })
139 .is_none()
140 );
141 Ok(())
142 }
143
144 fn seal_current_epoch(&mut self, next_epoch: u64, _opts: SealCurrentEpochOptions) {
145 let state = self.state.as_mut().expect("should have init");
146 let epoch = &mut state.epoch;
147 assert!(next_epoch > epoch.curr);
148 epoch.prev = epoch.curr;
149 epoch.curr = next_epoch;
150 let _ = self
151 .hummock_event_sender
152 .send(HummockEvent::VectorWriterSealEpoch {
153 table_id: self.table_id,
154 next_epoch,
155 add: state.writer_impl.seal_current_epoch(),
156 });
157 }
158
159 async fn flush(&mut self) -> StorageResult<usize> {
160 Ok(self
161 .state
162 .as_mut()
163 .expect("should have init")
164 .writer_impl
165 .flush()
166 .await?)
167 }
168
169 async fn try_flush(&mut self) -> StorageResult<()> {
170 Ok(self
171 .state
172 .as_mut()
173 .expect("should have init")
174 .writer_impl
175 .try_flush()
176 .await?)
177 }
178}
179
180impl StateStoreWriteVector for HummockVectorWriter {
181 fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> StorageResult<()> {
182 Ok(self
183 .state
184 .as_mut()
185 .expect("should have init")
186 .writer_impl
187 .insert(vec, info)?)
188 }
189}