risingwave_storage/hummock/store/
vector_writer.rs1use std::sync::Arc;
16
17use bytes::Bytes;
18use risingwave_common::catalog::TableId;
19use risingwave_common::util::epoch::EpochPair;
20use risingwave_hummock_sdk::HummockEpoch;
21
22use crate::error::StorageResult;
23use crate::hummock::event_handler::HummockEvent;
24use crate::hummock::event_handler::hummock_event_handler::HummockEventSender;
25use crate::hummock::local_version::pinned_version::PinnedVersion;
26use crate::hummock::utils::wait_for_epoch;
27use crate::hummock::vector::writer::VectorWriterImpl;
28use crate::hummock::{HummockError, HummockResult, ObjectIdManagerRef, SstableStoreRef};
29use crate::opts::StorageOpts;
30use crate::store::*;
31
32struct VectorWriterInitGuard {
33 table_id: TableId,
34 hummock_event_sender: HummockEventSender,
35}
36
37impl VectorWriterInitGuard {
38 fn new(
39 table_id: TableId,
40 init_epoch: HummockEpoch,
41 hummock_event_sender: HummockEventSender,
42 ) -> HummockResult<Self> {
43 hummock_event_sender
44 .send(HummockEvent::RegisterVectorWriter {
45 table_id,
46 init_epoch,
47 })
48 .map_err(|_| HummockError::other("failed to send register vector writer event"))?;
49 Ok(Self {
50 table_id,
51 hummock_event_sender,
52 })
53 }
54}
55
56impl Drop for VectorWriterInitGuard {
57 fn drop(&mut self) {
58 let _ = self
59 .hummock_event_sender
60 .send(HummockEvent::DropVectorWriter {
61 table_id: self.table_id,
62 });
63 }
64}
65
66struct VectorWriterState {
67 epoch: EpochPair,
68 writer_impl: VectorWriterImpl,
69 _guard: VectorWriterInitGuard,
70}
71
72pub struct HummockVectorWriter {
73 table_id: TableId,
74 version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
75 sstable_store: SstableStoreRef,
76 object_id_manager: ObjectIdManagerRef,
77 hummock_event_sender: HummockEventSender,
78 storage_opts: Arc<StorageOpts>,
79
80 state: Option<VectorWriterState>,
81}
82
83impl HummockVectorWriter {
84 pub(super) fn new(
85 table_id: TableId,
86 version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
87 sstable_store: SstableStoreRef,
88 object_id_manager: ObjectIdManagerRef,
89 hummock_event_sender: HummockEventSender,
90 storage_opts: Arc<StorageOpts>,
91 ) -> Self {
92 Self {
93 table_id,
94 version_update_notifier_tx,
95 sstable_store,
96 object_id_manager,
97 hummock_event_sender,
98 storage_opts,
99 state: None,
100 }
101 }
102}
103
104impl StateStoreWriteEpochControl for HummockVectorWriter {
105 async fn init(&mut self, opts: InitOptions) -> StorageResult<()> {
106 let version = wait_for_epoch(
107 &self.version_update_notifier_tx,
108 opts.epoch.prev,
109 self.table_id,
110 )
111 .await?;
112 let index = &version.vector_indexes[&self.table_id];
113 assert!(
114 self.state
115 .replace(VectorWriterState {
116 epoch: opts.epoch,
117 writer_impl: VectorWriterImpl::new(
118 index,
119 self.sstable_store.clone(),
120 self.object_id_manager.clone(),
121 &self.storage_opts,
122 ),
123 _guard: VectorWriterInitGuard::new(
124 self.table_id,
125 opts.epoch.curr,
126 self.hummock_event_sender.clone()
127 )?,
128 })
129 .is_none()
130 );
131 Ok(())
132 }
133
134 fn seal_current_epoch(&mut self, next_epoch: u64, _opts: SealCurrentEpochOptions) {
135 let state = self.state.as_mut().expect("should have init");
136 let epoch = &mut state.epoch;
137 assert!(next_epoch > epoch.curr);
138 epoch.prev = epoch.curr;
139 epoch.curr = next_epoch;
140 let _ = self
141 .hummock_event_sender
142 .send(HummockEvent::VectorWriterSealEpoch {
143 table_id: self.table_id,
144 next_epoch,
145 add: state.writer_impl.seal_current_epoch(),
146 });
147 }
148
149 async fn flush(&mut self) -> StorageResult<usize> {
150 Ok(self
151 .state
152 .as_mut()
153 .expect("should have init")
154 .writer_impl
155 .flush()
156 .await?)
157 }
158
159 async fn try_flush(&mut self) -> StorageResult<()> {
160 Ok(self
161 .state
162 .as_mut()
163 .expect("should have init")
164 .writer_impl
165 .try_flush()
166 .await?)
167 }
168}
169
170impl StateStoreWriteVector for HummockVectorWriter {
171 fn insert(&mut self, vec: Vector, info: Bytes) -> StorageResult<()> {
172 Ok(self
173 .state
174 .as_mut()
175 .expect("should have init")
176 .writer_impl
177 .insert(vec, info)?)
178 }
179}