risingwave_storage/monitor/
monitored_store.rs1use std::collections::HashSet;
16use std::marker::PhantomData;
17use std::sync::Arc;
18
19use await_tree::{InstrumentAwait, SpanExt};
20use bytes::Bytes;
21use futures::future::BoxFuture;
22use futures::{Future, FutureExt, TryFutureExt};
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::TableId;
25use risingwave_common::hash::VirtualNode;
26use risingwave_hummock_sdk::key::{TableKey, TableKeyRange};
27use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch, SyncResult};
28use thiserror_ext::AsReport;
29use tokio::time::Instant;
30use tracing::{Instrument, error};
31
32use super::{MonitoredStateStoreGetStats, MonitoredStateStoreIterStats, MonitoredStorageMetrics};
33use crate::error::StorageResult;
34use crate::hummock::sstable_store::SstableStoreRef;
35use crate::hummock::{HummockStorage, SstableObjectIdManagerRef};
36use crate::monitor::monitored_storage_metrics::StateStoreIterStats;
37use crate::monitor::{StateStoreIterLogStats, StateStoreIterStatsTrait};
38use crate::store::*;
39use crate::store_impl::AsHummock;
40
41#[derive(Clone)]
43pub struct MonitoredStateStore<S> {
44 inner: Box<S>,
45
46 storage_metrics: Arc<MonitoredStorageMetrics>,
47}
48
49impl<S> MonitoredStateStore<S> {
50 pub fn new(inner: S, storage_metrics: Arc<MonitoredStorageMetrics>) -> Self {
51 Self {
52 inner: Box::new(inner),
53 storage_metrics,
54 }
55 }
56}
57
58impl<S> MonitoredStateStore<S> {
63 async fn monitored_iter<
64 'a,
65 Item: IterItem,
66 I: StateStoreIter<Item> + 'a,
67 Stat: StateStoreIterStatsTrait<Item = Item>,
68 >(
69 &'a self,
70 table_id: TableId,
71 iter_stream_future: impl Future<Output = StorageResult<I>> + 'a,
72 ) -> StorageResult<MonitoredStateStoreIter<Item, I, Stat>> {
73 let start_time = Instant::now();
76 let iter_stream = iter_stream_future
77 .await
78 .inspect_err(|e| error!(error = %e.as_report(), "Failed in iter"))?;
79 let iter_init_duration = start_time.elapsed();
80
81 let monitored = MonitoredStateStoreIter {
83 inner: iter_stream,
84 stats: MonitoredStateStoreIterStats {
85 inner: Stat::new(table_id.table_id, &self.storage_metrics, iter_init_duration),
86 table_id: table_id.table_id,
87 metrics: self.storage_metrics.clone(),
88 },
89 _phantom: PhantomData,
90 };
91 Ok(monitored)
92 }
93
94 pub fn inner(&self) -> &S {
95 &self.inner
96 }
97
98 async fn monitored_get(
99 &self,
100 get_future: impl Future<Output = StorageResult<Option<Bytes>>>,
101 table_id: TableId,
102 key_len: usize,
103 ) -> StorageResult<Option<Bytes>> {
104 let mut stats =
105 MonitoredStateStoreGetStats::new(table_id.table_id, self.storage_metrics.clone());
106
107 let value = get_future
108 .instrument_await("store_get".verbose())
109 .instrument(tracing::trace_span!("store_get"))
110 .await
111 .inspect_err(|e| error!(error = %e.as_report(), "Failed in get"))?;
112
113 stats.get_key_size = key_len;
114 if let Some(value) = value.as_ref() {
115 stats.get_value_size = value.len();
116 }
117 stats.report();
118
119 Ok(value)
120 }
121
122 async fn monitored_get_keyed_row(
123 &self,
124 get_keyed_row_future: impl Future<Output = StorageResult<Option<StateStoreKeyedRow>>>,
125 table_id: TableId,
126 key_len: usize,
127 ) -> StorageResult<Option<StateStoreKeyedRow>> {
128 let mut stats =
129 MonitoredStateStoreGetStats::new(table_id.table_id, self.storage_metrics.clone());
130
131 let value = get_keyed_row_future
132 .instrument_await("store_get_keyed_row".verbose())
133 .instrument(tracing::trace_span!("store_get_keyed_row"))
134 .await
135 .inspect_err(|e| error!(error = %e.as_report(), "Failed in get"))?;
136
137 stats.get_key_size = key_len;
138 if let Some((_, value)) = value.as_ref() {
139 stats.get_value_size = value.len();
140 }
141 stats.report();
142
143 Ok(value)
144 }
145}
146
147impl<S: StateStoreRead> StateStoreRead for MonitoredStateStore<S> {
148 type Iter = impl StateStoreReadIter;
149 type RevIter = impl StateStoreReadIter;
150
151 fn get_keyed_row(
152 &self,
153 key: TableKey<Bytes>,
154 read_options: ReadOptions,
155 ) -> impl Future<Output = StorageResult<Option<StateStoreKeyedRow>>> + '_ {
156 let table_id = read_options.table_id;
157 let key_len = key.len();
158 self.monitored_get_keyed_row(
159 self.inner.get_keyed_row(key, read_options),
160 table_id,
161 key_len,
162 )
163 }
164
165 fn iter(
166 &self,
167 key_range: TableKeyRange,
168 read_options: ReadOptions,
169 ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
170 self.monitored_iter::<'_, _, _, StateStoreIterStats>(
171 read_options.table_id,
172 self.inner.iter(key_range, read_options),
173 )
174 }
175
176 fn rev_iter(
177 &self,
178 key_range: TableKeyRange,
179 read_options: ReadOptions,
180 ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
181 self.monitored_iter::<'_, _, _, StateStoreIterStats>(
182 read_options.table_id,
183 self.inner.rev_iter(key_range, read_options),
184 )
185 }
186}
187
188impl<S: StateStoreReadLog> StateStoreReadLog for MonitoredStateStore<S> {
189 type ChangeLogIter = impl StateStoreReadChangeLogIter;
190
191 fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> impl StorageFuture<'_, u64> {
192 self.inner.next_epoch(epoch, options)
193 }
194
195 fn iter_log(
196 &self,
197 epoch_range: (u64, u64),
198 key_range: TableKeyRange,
199 options: ReadLogOptions,
200 ) -> impl Future<Output = StorageResult<Self::ChangeLogIter>> + Send + '_ {
201 self.monitored_iter::<'_, _, _, StateStoreIterLogStats>(
202 options.table_id,
203 self.inner.iter_log(epoch_range, key_range, options),
204 )
205 }
206}
207
208impl<S: LocalStateStore> LocalStateStore for MonitoredStateStore<S> {
209 type FlushedSnapshotReader = MonitoredStateStore<S::FlushedSnapshotReader>;
210
211 type Iter<'a> = impl StateStoreIter + 'a;
212 type RevIter<'a> = impl StateStoreIter + 'a;
213
214 fn get(
215 &self,
216 key: TableKey<Bytes>,
217 read_options: ReadOptions,
218 ) -> impl Future<Output = StorageResult<Option<Bytes>>> + Send + '_ {
219 let table_id = read_options.table_id;
220 let key_len = key.len();
221 self.monitored_get(self.inner.get(key, read_options), table_id, key_len)
223 }
224
225 fn iter(
226 &self,
227 key_range: TableKeyRange,
228 read_options: ReadOptions,
229 ) -> impl Future<Output = StorageResult<Self::Iter<'_>>> + Send + '_ {
230 let table_id = read_options.table_id;
231 self.monitored_iter::<'_, _, _, StateStoreIterStats>(
232 table_id,
233 self.inner.iter(key_range, read_options),
234 )
235 }
236
237 fn rev_iter(
238 &self,
239 key_range: TableKeyRange,
240 read_options: ReadOptions,
241 ) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_ {
242 let table_id = read_options.table_id;
243 self.monitored_iter::<'_, _, _, StateStoreIterStats>(
244 table_id,
245 self.inner.rev_iter(key_range, read_options),
246 )
247 }
248
249 fn insert(
250 &mut self,
251 key: TableKey<Bytes>,
252 new_val: Bytes,
253 old_val: Option<Bytes>,
254 ) -> StorageResult<()> {
255 self.inner.insert(key, new_val, old_val)
257 }
258
259 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
260 self.inner.delete(key, old_val)
262 }
263
264 fn flush(&mut self) -> impl Future<Output = StorageResult<usize>> + Send + '_ {
265 self.inner.flush().instrument_await("store_flush".verbose())
266 }
267
268 fn epoch(&self) -> u64 {
269 self.inner.epoch()
270 }
271
272 fn is_dirty(&self) -> bool {
273 self.inner.is_dirty()
274 }
275
276 async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
277 self.inner.init(options).await
278 }
279
280 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
281 self.inner.seal_current_epoch(next_epoch, opts)
283 }
284
285 fn try_flush(&mut self) -> impl Future<Output = StorageResult<()>> + Send + '_ {
286 self.inner
287 .try_flush()
288 .instrument_await("store_try_flush".verbose())
289 }
290
291 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
292 self.inner.update_vnode_bitmap(vnodes).await
293 }
294
295 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
296 self.inner.get_table_watermark(vnode)
297 }
298
299 fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
300 MonitoredStateStore::new(
301 self.inner.new_flushed_snapshot_reader(),
302 self.storage_metrics.clone(),
303 )
304 }
305}
306
307impl<S: StateStore> StateStore for MonitoredStateStore<S> {
308 type Local = MonitoredStateStore<S::Local>;
309 type ReadSnapshot = MonitoredStateStore<S::ReadSnapshot>;
310
311 fn try_wait_epoch(
312 &self,
313 epoch: HummockReadEpoch,
314 options: TryWaitEpochOptions,
315 ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
316 self.inner
317 .try_wait_epoch(epoch, options)
318 .instrument_await("store_wait_epoch".verbose())
319 .inspect_err(|e| error!(error = %e.as_report(), "Failed in wait_epoch"))
320 }
321
322 fn monitored(
323 self,
324 _storage_metrics: Arc<MonitoredStorageMetrics>,
325 ) -> MonitoredStateStore<Self> {
326 panic!("the state store is already monitored")
327 }
328
329 async fn new_local(&self, option: NewLocalOptions) -> Self::Local {
330 MonitoredStateStore::new(
331 self.inner
332 .new_local(option)
333 .instrument_await("store_new_local")
334 .await,
335 self.storage_metrics.clone(),
336 )
337 }
338
339 async fn new_read_snapshot(
340 &self,
341 epoch: HummockReadEpoch,
342 options: NewReadSnapshotOptions,
343 ) -> StorageResult<Self::ReadSnapshot> {
344 Ok(MonitoredStateStore::new(
345 self.inner.new_read_snapshot(epoch, options).await?,
346 self.storage_metrics.clone(),
347 ))
348 }
349}
350
351impl MonitoredStateStore<HummockStorage> {
352 pub fn sstable_store(&self) -> SstableStoreRef {
353 self.inner.sstable_store()
354 }
355
356 pub fn sstable_object_id_manager(&self) -> SstableObjectIdManagerRef {
357 self.inner.sstable_object_id_manager().clone()
358 }
359}
360
361impl<S: AsHummock> AsHummock for MonitoredStateStore<S> {
362 fn as_hummock(&self) -> Option<&HummockStorage> {
363 self.inner.as_hummock()
364 }
365
366 fn sync(
367 &self,
368 sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
369 ) -> BoxFuture<'_, StorageResult<SyncResult>> {
370 async move {
371 let future = self
372 .inner
373 .sync(sync_table_epochs)
374 .instrument_await("store_sync");
375 let timer = self.storage_metrics.sync_duration.start_timer();
376 let sync_size = self.storage_metrics.sync_size.clone();
377 let sync_result = future
378 .await
379 .inspect_err(|e| error!(error = %e.as_report(), "Failed in sync"))?;
380 timer.observe_duration();
381 if sync_result.sync_size != 0 {
382 sync_size.observe(sync_result.sync_size as _);
383 }
384 Ok(sync_result)
385 }
386 .boxed()
387 }
388}
389
390pub(crate) struct MonitoredStateStoreIter<
392 Item: IterItem,
393 I,
394 S: StateStoreIterStatsTrait<Item = Item>,
395> {
396 inner: I,
397 stats: MonitoredStateStoreIterStats<S>,
398 _phantom: PhantomData<Item>,
399}
400
401impl<Item: IterItem, I: StateStoreIter<Item>, S: StateStoreIterStatsTrait<Item = Item>>
402 StateStoreIter<Item> for MonitoredStateStoreIter<Item, I, S>
403{
404 async fn try_next(&mut self) -> StorageResult<Option<Item::ItemRef<'_>>> {
405 if let Some(item) = self
406 .inner
407 .try_next()
408 .instrument(tracing::trace_span!("store_iter_try_next"))
409 .await
410 .inspect_err(|e| error!(error = %e.as_report(), "Failed in next"))?
411 {
412 self.stats.inner.observe(item);
413 Ok(Some(item))
414 } else {
415 Ok(None)
416 }
417 }
418}