risingwave_storage/hummock/store/
vector_writer.rs1use std::sync::Arc;
16
17use bytes::Bytes;
18use risingwave_common::catalog::TableId;
19use risingwave_common::util::epoch::EpochPair;
20use risingwave_hummock_sdk::HummockEpoch;
21
22use crate::error::StorageResult;
23use crate::hummock::event_handler::HummockEvent;
24use crate::hummock::event_handler::hummock_event_handler::HummockEventSender;
25use crate::hummock::local_version::pinned_version::PinnedVersion;
26use crate::hummock::utils::wait_for_epoch;
27use crate::hummock::vector::writer::VectorWriterImpl;
28use crate::hummock::{HummockError, HummockResult, ObjectIdManagerRef, SstableStoreRef};
29use crate::opts::StorageOpts;
30use crate::store::*;
31
32struct VectorWriterInitGuard {
33 table_id: TableId,
34 hummock_event_sender: HummockEventSender,
35}
36
37impl VectorWriterInitGuard {
38 fn new(
39 table_id: TableId,
40 init_epoch: HummockEpoch,
41 hummock_event_sender: HummockEventSender,
42 ) -> HummockResult<Self> {
43 hummock_event_sender
44 .send(HummockEvent::RegisterVectorWriter {
45 table_id,
46 init_epoch,
47 })
48 .map_err(|_| HummockError::other("failed to send register vector writer event"))?;
49 Ok(Self {
50 table_id,
51 hummock_event_sender,
52 })
53 }
54}
55
56impl Drop for VectorWriterInitGuard {
57 fn drop(&mut self) {
58 let _ = self
59 .hummock_event_sender
60 .send(HummockEvent::DropVectorWriter {
61 table_id: self.table_id,
62 });
63 }
64}
65
66struct VectorWriterState {
67 epoch: EpochPair,
68 writer_impl: VectorWriterImpl,
69 _guard: VectorWriterInitGuard,
70}
71
72pub struct HummockVectorWriter {
73 table_id: TableId,
74 version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
75 sstable_store: SstableStoreRef,
76 object_id_manager: ObjectIdManagerRef,
77 hummock_event_sender: HummockEventSender,
78 storage_opts: Arc<StorageOpts>,
79
80 state: Option<VectorWriterState>,
81}
82
83impl HummockVectorWriter {
84 pub(super) fn new(
85 table_id: TableId,
86 version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
87 sstable_store: SstableStoreRef,
88 object_id_manager: ObjectIdManagerRef,
89 hummock_event_sender: HummockEventSender,
90 storage_opts: Arc<StorageOpts>,
91 ) -> Self {
92 Self {
93 table_id,
94 version_update_notifier_tx,
95 sstable_store,
96 object_id_manager,
97 hummock_event_sender,
98 storage_opts,
99 state: None,
100 }
101 }
102}
103
104impl StateStoreWriteEpochControl for HummockVectorWriter {
105 async fn init(&mut self, opts: InitOptions) -> StorageResult<()> {
106 let version = wait_for_epoch(
107 &self.version_update_notifier_tx,
108 opts.epoch.prev,
109 self.table_id,
110 )
111 .await?;
112 let index = &version.vector_indexes.get(&self.table_id).ok_or_else(|| {
113 HummockError::other(format!("vector index not found: {}", self.table_id))
114 })?;
115 assert!(
116 self.state
117 .replace(VectorWriterState {
118 epoch: opts.epoch,
119 writer_impl: VectorWriterImpl::new(
120 index,
121 self.sstable_store.clone(),
122 self.object_id_manager.clone(),
123 &self.storage_opts,
124 ),
125 _guard: VectorWriterInitGuard::new(
126 self.table_id,
127 opts.epoch.curr,
128 self.hummock_event_sender.clone()
129 )?,
130 })
131 .is_none()
132 );
133 Ok(())
134 }
135
136 fn seal_current_epoch(&mut self, next_epoch: u64, _opts: SealCurrentEpochOptions) {
137 let state = self.state.as_mut().expect("should have init");
138 let epoch = &mut state.epoch;
139 assert!(next_epoch > epoch.curr);
140 epoch.prev = epoch.curr;
141 epoch.curr = next_epoch;
142 let _ = self
143 .hummock_event_sender
144 .send(HummockEvent::VectorWriterSealEpoch {
145 table_id: self.table_id,
146 next_epoch,
147 add: state.writer_impl.seal_current_epoch(),
148 });
149 }
150
151 async fn flush(&mut self) -> StorageResult<usize> {
152 Ok(self
153 .state
154 .as_mut()
155 .expect("should have init")
156 .writer_impl
157 .flush()
158 .await?)
159 }
160
161 async fn try_flush(&mut self) -> StorageResult<()> {
162 Ok(self
163 .state
164 .as_mut()
165 .expect("should have init")
166 .writer_impl
167 .try_flush()
168 .await?)
169 }
170}
171
172impl StateStoreWriteVector for HummockVectorWriter {
173 fn insert(&mut self, vec: Vector, info: Bytes) -> StorageResult<()> {
174 Ok(self
175 .state
176 .as_mut()
177 .expect("should have init")
178 .writer_impl
179 .insert(vec, info)?)
180 }
181}