risingwave_storage/hummock/store/
vector_writer.rs1use std::sync::Arc;
16
17use bytes::Bytes;
18use risingwave_common::catalog::TableId;
19use risingwave_common::util::epoch::EpochPair;
20use risingwave_hummock_sdk::HummockEpoch;
21
22use crate::error::StorageResult;
23use crate::hummock::event_handler::HummockEvent;
24use crate::hummock::event_handler::hummock_event_handler::HummockEventSender;
25use crate::hummock::local_version::pinned_version::PinnedVersion;
26use crate::hummock::utils::wait_for_epoch;
27use crate::hummock::vector::writer::VectorWriterImpl;
28use crate::hummock::{HummockError, HummockResult, ObjectIdManagerRef, SstableStoreRef};
29use crate::opts::StorageOpts;
30use crate::store::*;
31
32struct VectorWriterInitGuard {
33 table_id: TableId,
34 hummock_event_sender: HummockEventSender,
35}
36
37impl VectorWriterInitGuard {
38 fn new(
39 table_id: TableId,
40 init_epoch: HummockEpoch,
41 hummock_event_sender: HummockEventSender,
42 ) -> HummockResult<Self> {
43 hummock_event_sender
44 .send(HummockEvent::RegisterVectorWriter {
45 table_id,
46 init_epoch,
47 })
48 .map_err(|_| HummockError::other("failed to send register vector writer event"))?;
49 Ok(Self {
50 table_id,
51 hummock_event_sender,
52 })
53 }
54}
55
56impl Drop for VectorWriterInitGuard {
57 fn drop(&mut self) {
58 let _ = self
59 .hummock_event_sender
60 .send(HummockEvent::DropVectorWriter {
61 table_id: self.table_id,
62 });
63 }
64}
65
66struct VectorWriterState {
67 epoch: EpochPair,
68 writer_impl: VectorWriterImpl,
69 _guard: VectorWriterInitGuard,
70}
71
72pub struct HummockVectorWriter {
73 table_id: TableId,
74 version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
75 sstable_store: SstableStoreRef,
76 object_id_manager: ObjectIdManagerRef,
77 hummock_event_sender: HummockEventSender,
78 storage_opts: Arc<StorageOpts>,
79
80 state: Option<VectorWriterState>,
81}
82
83impl HummockVectorWriter {
84 pub(super) fn new(
85 table_id: TableId,
86 version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
87 sstable_store: SstableStoreRef,
88 object_id_manager: ObjectIdManagerRef,
89 hummock_event_sender: HummockEventSender,
90 storage_opts: Arc<StorageOpts>,
91 ) -> Self {
92 Self {
93 table_id,
94 version_update_notifier_tx,
95 sstable_store,
96 object_id_manager,
97 hummock_event_sender,
98 storage_opts,
99 state: None,
100 }
101 }
102}
103
104impl StateStoreWriteEpochControl for HummockVectorWriter {
105 async fn init(&mut self, opts: InitOptions) -> StorageResult<()> {
106 let version = wait_for_epoch(
107 &self.version_update_notifier_tx,
108 opts.epoch.prev,
109 self.table_id,
110 )
111 .await?;
112 let index = &version.vector_indexes.get(&self.table_id).ok_or_else(|| {
113 HummockError::other(format!("vector index not found: {}", self.table_id))
114 })?;
115 assert!(
116 self.state
117 .replace(VectorWriterState {
118 epoch: opts.epoch,
119 writer_impl: VectorWriterImpl::new(
120 index,
121 self.sstable_store.clone(),
122 self.object_id_manager.clone(),
123 &self.storage_opts,
124 )
125 .await?,
126 _guard: VectorWriterInitGuard::new(
127 self.table_id,
128 opts.epoch.curr,
129 self.hummock_event_sender.clone()
130 )?,
131 })
132 .is_none()
133 );
134 Ok(())
135 }
136
137 fn seal_current_epoch(&mut self, next_epoch: u64, _opts: SealCurrentEpochOptions) {
138 let state = self.state.as_mut().expect("should have init");
139 let epoch = &mut state.epoch;
140 assert!(next_epoch > epoch.curr);
141 epoch.prev = epoch.curr;
142 epoch.curr = next_epoch;
143 let _ = self
144 .hummock_event_sender
145 .send(HummockEvent::VectorWriterSealEpoch {
146 table_id: self.table_id,
147 next_epoch,
148 add: state.writer_impl.seal_current_epoch(),
149 });
150 }
151
152 async fn flush(&mut self) -> StorageResult<usize> {
153 Ok(self
154 .state
155 .as_mut()
156 .expect("should have init")
157 .writer_impl
158 .flush()
159 .await?)
160 }
161
162 async fn try_flush(&mut self) -> StorageResult<()> {
163 Ok(self
164 .state
165 .as_mut()
166 .expect("should have init")
167 .writer_impl
168 .try_flush()
169 .await?)
170 }
171}
172
173impl StateStoreWriteVector for HummockVectorWriter {
174 fn insert(&mut self, vec: Vector, info: Bytes) -> StorageResult<()> {
175 Ok(self
176 .state
177 .as_mut()
178 .expect("should have init")
179 .writer_impl
180 .insert(vec, info)?)
181 }
182}