1use std::collections::HashSet;
16use std::marker::PhantomData;
17use std::sync::Arc;
18
19use await_tree::{InstrumentAwait, SpanExt};
20use bytes::Bytes;
21use futures::future::BoxFuture;
22use futures::{Future, FutureExt, TryFutureExt};
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::TableId;
25use risingwave_common::hash::VirtualNode;
26use risingwave_hummock_sdk::key::{TableKey, TableKeyRange};
27use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch, SyncResult};
28use thiserror_ext::AsReport;
29use tokio::time::Instant;
30use tracing::{Instrument, error};
31
32use super::{MonitoredStateStoreGetStats, MonitoredStateStoreIterStats, MonitoredStorageMetrics};
33use crate::error::StorageResult;
34use crate::hummock::sstable_store::SstableStoreRef;
35use crate::hummock::{HummockStorage, ObjectIdManagerRef};
36use crate::monitor::monitored_storage_metrics::StateStoreIterStats;
37use crate::monitor::{StateStoreIterLogStats, StateStoreIterStatsTrait};
38use crate::store::*;
39use crate::store_impl::AsHummock;
40
41#[derive(Clone)]
43pub struct MonitoredStateStore<S, E = ()> {
44 inner: Box<S>,
45 storage_metrics: Arc<MonitoredStorageMetrics>,
46 extra: E,
47}
48
49type MonitoredTableStateStore<S> = MonitoredStateStore<S, TableId>;
50
51impl<S> MonitoredStateStore<S> {
52 pub fn new(inner: S, storage_metrics: Arc<MonitoredStorageMetrics>) -> Self {
53 Self {
54 inner: Box::new(inner),
55 storage_metrics,
56 extra: (),
57 }
58 }
59
60 pub fn inner(&self) -> &S {
61 &self.inner
62 }
63}
64
65impl<S> MonitoredTableStateStore<S> {
66 fn new(inner: S, storage_metrics: Arc<MonitoredStorageMetrics>, table_id: TableId) -> Self {
67 Self {
68 inner: Box::new(inner),
69 storage_metrics,
70 extra: table_id,
71 }
72 }
73
74 fn table_id(&self) -> TableId {
75 self.extra
76 }
77}
78
79impl<S, E> MonitoredStateStore<S, E> {
84 async fn monitored_iter<
85 'a,
86 Item: IterItem,
87 I: StateStoreIter<Item> + 'a,
88 Stat: StateStoreIterStatsTrait<Item = Item>,
89 >(
90 &'a self,
91 table_id: TableId,
92 iter_stream_future: impl Future<Output = StorageResult<I>> + 'a,
93 ) -> StorageResult<MonitoredStateStoreIter<Item, I, Stat>> {
94 let start_time = Instant::now();
97 let iter_stream = iter_stream_future
98 .await
99 .inspect_err(|e| error!(error = %e.as_report(), "Failed in iter"))?;
100 let iter_init_duration = start_time.elapsed();
101
102 let monitored = MonitoredStateStoreIter {
104 inner: iter_stream,
105 stats: MonitoredStateStoreIterStats {
106 inner: Stat::new(table_id.table_id, &self.storage_metrics, iter_init_duration),
107 table_id: table_id.table_id,
108 metrics: self.storage_metrics.clone(),
109 },
110 _phantom: PhantomData,
111 };
112 Ok(monitored)
113 }
114
115 async fn monitored_on_key_value<O>(
116 &self,
117 on_key_value_future: impl Future<Output = StorageResult<Option<(O, usize)>>>,
118 table_id: TableId,
119 key_len: usize,
120 ) -> StorageResult<Option<O>> {
121 let mut stats =
122 MonitoredStateStoreGetStats::new(table_id.table_id, self.storage_metrics.clone());
123
124 let value = on_key_value_future
125 .instrument_await("store_on_key_value".verbose())
126 .instrument(tracing::trace_span!("store_on_key_value"))
127 .await
128 .inspect_err(|e| error!(error = %e.as_report(), "Failed in get"))?;
129
130 stats.get_key_size = key_len;
131 let value = value.map(|(value, value_len)| {
132 stats.get_value_size = value_len;
133 value
134 });
135 stats.report();
136
137 Ok(value)
138 }
139}
140
141impl<S: StateStoreGet> StateStoreGet for MonitoredTableStateStore<S> {
142 fn on_key_value<O: Send + 'static>(
143 &self,
144 key: TableKey<Bytes>,
145 read_options: ReadOptions,
146 on_key_value_fn: impl KeyValueFn<O>,
147 ) -> impl StorageFuture<'_, Option<O>> {
148 let table_id = self.table_id();
149 let key_len = key.len();
150 self.monitored_on_key_value(
151 self.inner
152 .on_key_value(key, read_options, move |key, value| {
153 let result = on_key_value_fn(key, value);
154 result.map(|output| (output, value.len()))
155 }),
156 table_id,
157 key_len,
158 )
159 }
160}
161
162impl<S: StateStoreRead> StateStoreRead for MonitoredTableStateStore<S> {
163 type Iter = impl StateStoreReadIter;
164 type RevIter = impl StateStoreReadIter;
165
166 fn iter(
167 &self,
168 key_range: TableKeyRange,
169 read_options: ReadOptions,
170 ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
171 self.monitored_iter::<'_, _, _, StateStoreIterStats>(
172 self.table_id(),
173 self.inner.iter(key_range, read_options),
174 )
175 }
176
177 fn rev_iter(
178 &self,
179 key_range: TableKeyRange,
180 read_options: ReadOptions,
181 ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
182 self.monitored_iter::<'_, _, _, StateStoreIterStats>(
183 self.table_id(),
184 self.inner.rev_iter(key_range, read_options),
185 )
186 }
187}
188
189impl<S: StateStoreReadLog> StateStoreReadLog for MonitoredStateStore<S> {
190 type ChangeLogIter = impl StateStoreReadChangeLogIter;
191
192 fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> impl StorageFuture<'_, u64> {
193 self.inner.next_epoch(epoch, options)
194 }
195
196 fn iter_log(
197 &self,
198 epoch_range: (u64, u64),
199 key_range: TableKeyRange,
200 options: ReadLogOptions,
201 ) -> impl Future<Output = StorageResult<Self::ChangeLogIter>> + Send + '_ {
202 self.monitored_iter::<'_, _, _, StateStoreIterLogStats>(
203 options.table_id,
204 self.inner.iter_log(epoch_range, key_range, options),
205 )
206 }
207}
208
209impl<S: StateStoreReadVector> StateStoreReadVector for MonitoredTableStateStore<S> {
210 fn nearest<O: Send + 'static>(
211 &self,
212 vec: Vector,
213 options: VectorNearestOptions,
214 on_nearest_item_fn: impl OnNearestItemFn<O>,
215 ) -> impl StorageFuture<'_, Vec<O>> {
216 self.inner.nearest(vec, options, on_nearest_item_fn)
218 }
219}
220
221impl<S: LocalStateStore> LocalStateStore for MonitoredTableStateStore<S> {
222 type FlushedSnapshotReader = MonitoredTableStateStore<S::FlushedSnapshotReader>;
223
224 type Iter<'a> = impl StateStoreIter + 'a;
225 type RevIter<'a> = impl StateStoreIter + 'a;
226
227 fn iter(
228 &self,
229 key_range: TableKeyRange,
230 read_options: ReadOptions,
231 ) -> impl Future<Output = StorageResult<Self::Iter<'_>>> + Send + '_ {
232 self.monitored_iter::<'_, _, _, StateStoreIterStats>(
233 self.table_id(),
234 self.inner.iter(key_range, read_options),
235 )
236 }
237
238 fn rev_iter(
239 &self,
240 key_range: TableKeyRange,
241 read_options: ReadOptions,
242 ) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_ {
243 self.monitored_iter::<'_, _, _, StateStoreIterStats>(
244 self.table_id(),
245 self.inner.rev_iter(key_range, read_options),
246 )
247 }
248
249 fn insert(
250 &mut self,
251 key: TableKey<Bytes>,
252 new_val: Bytes,
253 old_val: Option<Bytes>,
254 ) -> StorageResult<()> {
255 self.inner.insert(key, new_val, old_val)
257 }
258
259 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
260 self.inner.delete(key, old_val)
262 }
263
264 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
265 self.inner.get_table_watermark(vnode)
266 }
267
268 fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
269 MonitoredTableStateStore::new(
270 self.inner.new_flushed_snapshot_reader(),
271 self.storage_metrics.clone(),
272 self.table_id(),
273 )
274 }
275
276 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
277 self.inner.update_vnode_bitmap(vnodes).await
278 }
279}
280
281impl<S: StateStoreWriteEpochControl> StateStoreWriteEpochControl for MonitoredTableStateStore<S> {
282 fn flush(&mut self) -> impl Future<Output = StorageResult<usize>> + Send + '_ {
283 self.inner.flush().instrument_await("store_flush".verbose())
284 }
285
286 async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
287 self.inner.init(options).await
288 }
289
290 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
291 self.inner.seal_current_epoch(next_epoch, opts)
293 }
294
295 fn try_flush(&mut self) -> impl Future<Output = StorageResult<()>> + Send + '_ {
296 self.inner
297 .try_flush()
298 .instrument_await("store_try_flush".verbose())
299 }
300}
301
302impl<S: StateStoreWriteVector> StateStoreWriteVector for MonitoredTableStateStore<S> {
303 fn insert(&mut self, vec: Vector, info: Bytes) -> StorageResult<()> {
304 self.inner.insert(vec, info)
306 }
307}
308
309impl<S: StateStore> StateStore for MonitoredStateStore<S> {
310 type Local = MonitoredTableStateStore<S::Local>;
311 type ReadSnapshot = MonitoredTableStateStore<S::ReadSnapshot>;
312 type VectorWriter = MonitoredTableStateStore<S::VectorWriter>;
313
314 fn try_wait_epoch(
315 &self,
316 epoch: HummockReadEpoch,
317 options: TryWaitEpochOptions,
318 ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
319 self.inner
320 .try_wait_epoch(epoch, options)
321 .instrument_await("store_wait_epoch".verbose())
322 .inspect_err(|e| error!(error = %e.as_report(), "Failed in wait_epoch"))
323 }
324
325 fn monitored(
326 self,
327 _storage_metrics: Arc<MonitoredStorageMetrics>,
328 ) -> MonitoredStateStore<Self> {
329 panic!("the state store is already monitored")
330 }
331
332 async fn new_local(&self, option: NewLocalOptions) -> Self::Local {
333 let table_id = option.table_id;
334 MonitoredTableStateStore::new(
335 self.inner
336 .new_local(option)
337 .instrument_await("store_new_local")
338 .await,
339 self.storage_metrics.clone(),
340 table_id,
341 )
342 }
343
344 async fn new_read_snapshot(
345 &self,
346 epoch: HummockReadEpoch,
347 options: NewReadSnapshotOptions,
348 ) -> StorageResult<Self::ReadSnapshot> {
349 Ok(MonitoredTableStateStore::new(
350 self.inner.new_read_snapshot(epoch, options).await?,
351 self.storage_metrics.clone(),
352 options.table_id,
353 ))
354 }
355
356 async fn new_vector_writer(&self, options: NewVectorWriterOptions) -> Self::VectorWriter {
357 let table_id = options.table_id;
358 MonitoredTableStateStore::new(
359 self.inner.new_vector_writer(options).await,
360 self.storage_metrics.clone(),
361 table_id,
362 )
363 }
364}
365
366impl MonitoredStateStore<HummockStorage> {
367 pub fn sstable_store(&self) -> SstableStoreRef {
368 self.inner.sstable_store()
369 }
370
371 pub fn object_id_manager(&self) -> ObjectIdManagerRef {
372 self.inner.object_id_manager().clone()
373 }
374}
375
376impl<S: AsHummock> AsHummock for MonitoredStateStore<S> {
377 fn as_hummock(&self) -> Option<&HummockStorage> {
378 self.inner.as_hummock()
379 }
380
381 fn sync(
382 &self,
383 sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
384 ) -> BoxFuture<'_, StorageResult<SyncResult>> {
385 async move {
386 let future = self
387 .inner
388 .sync(sync_table_epochs)
389 .instrument_await("store_sync");
390 let timer = self.storage_metrics.sync_duration.start_timer();
391 let sync_size = self.storage_metrics.sync_size.clone();
392 let sync_result = future
393 .await
394 .inspect_err(|e| error!(error = %e.as_report(), "Failed in sync"))?;
395 timer.observe_duration();
396 if sync_result.sync_size != 0 {
397 sync_size.observe(sync_result.sync_size as _);
398 }
399 Ok(sync_result)
400 }
401 .boxed()
402 }
403}
404
405pub(crate) struct MonitoredStateStoreIter<
407 Item: IterItem,
408 I,
409 S: StateStoreIterStatsTrait<Item = Item>,
410> {
411 inner: I,
412 stats: MonitoredStateStoreIterStats<S>,
413 _phantom: PhantomData<Item>,
414}
415
416impl<Item: IterItem, I: StateStoreIter<Item>, S: StateStoreIterStatsTrait<Item = Item>>
417 StateStoreIter<Item> for MonitoredStateStoreIter<Item, I, S>
418{
419 async fn try_next(&mut self) -> StorageResult<Option<Item::ItemRef<'_>>> {
420 if let Some(item) = self
421 .inner
422 .try_next()
423 .instrument(tracing::trace_span!("store_iter_try_next"))
424 .await
425 .inspect_err(|e| error!(error = %e.as_report(), "Failed in next"))?
426 {
427 self.stats.inner.observe(item);
428 Ok(Some(item))
429 } else {
430 Ok(None)
431 }
432 }
433}