1use std::cell::RefCell;
16use std::collections::{HashMap, HashSet};
17use std::marker::PhantomData;
18use std::sync::Arc;
19
20use await_tree::{InstrumentAwait, SpanExt};
21use bytes::Bytes;
22use futures::future::BoxFuture;
23use futures::{Future, FutureExt, TryFutureExt};
24use risingwave_common::bitmap::Bitmap;
25use risingwave_common::catalog::TableId;
26use risingwave_common::hash::VirtualNode;
27use risingwave_common::metrics::LabelGuardedHistogram;
28use risingwave_common::types::VectorRef;
29use risingwave_hummock_sdk::key::{TableKey, TableKeyRange};
30use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch, SyncResult};
31use thiserror_ext::AsReport;
32use tokio::time::Instant;
33use tracing::{Instrument, error};
34
35use super::{MonitoredStateStoreGetStats, MonitoredStateStoreIterStats, MonitoredStorageMetrics};
36use crate::error::StorageResult;
37use crate::hummock::sstable_store::SstableStoreRef;
38use crate::hummock::{HummockStorage, ObjectIdManagerRef};
39use crate::monitor::monitored_storage_metrics::StateStoreIterStats;
40use crate::monitor::{StateStoreIterLogStats, StateStoreIterStatsTrait};
41use crate::store::*;
42use crate::store_impl::AsHummock;
43
44#[derive(Clone)]
46pub struct MonitoredStateStore<S, E = ()> {
47 inner: Box<S>,
48 storage_metrics: Arc<MonitoredStorageMetrics>,
49 extra: E,
50}
51
52type MonitoredTableStateStore<S> = MonitoredStateStore<S, TableId>;
53
54impl<S> MonitoredStateStore<S> {
55 pub fn new(inner: S, storage_metrics: Arc<MonitoredStorageMetrics>) -> Self {
56 Self {
57 inner: Box::new(inner),
58 storage_metrics,
59 extra: (),
60 }
61 }
62
63 pub fn inner(&self) -> &S {
64 &self.inner
65 }
66}
67
68impl<S> MonitoredTableStateStore<S> {
69 fn new(inner: S, storage_metrics: Arc<MonitoredStorageMetrics>, table_id: TableId) -> Self {
70 Self {
71 inner: Box::new(inner),
72 storage_metrics,
73 extra: table_id,
74 }
75 }
76
77 fn table_id(&self) -> TableId {
78 self.extra
79 }
80}
81
82impl<S, E> MonitoredStateStore<S, E> {
87 async fn monitored_iter<
88 'a,
89 Item: IterItem,
90 I: StateStoreIter<Item> + 'a,
91 Stat: StateStoreIterStatsTrait<Item = Item>,
92 >(
93 &'a self,
94 table_id: TableId,
95 iter_stream_future: impl Future<Output = StorageResult<I>> + 'a,
96 ) -> StorageResult<MonitoredStateStoreIter<Item, I, Stat>> {
97 let start_time = Instant::now();
100 let iter_stream = iter_stream_future
101 .await
102 .inspect_err(|e| error!(error = %e.as_report(), "Failed in iter"))?;
103 let iter_init_duration = start_time.elapsed();
104
105 let monitored = MonitoredStateStoreIter {
107 inner: iter_stream,
108 stats: MonitoredStateStoreIterStats {
109 inner: Stat::new(table_id, &self.storage_metrics, iter_init_duration),
110 table_id,
111 metrics: self.storage_metrics.clone(),
112 },
113 _phantom: PhantomData,
114 };
115 Ok(monitored)
116 }
117
118 async fn monitored_on_key_value<O>(
119 &self,
120 on_key_value_future: impl Future<Output = StorageResult<Option<(O, usize)>>>,
121 table_id: TableId,
122 key_len: usize,
123 ) -> StorageResult<Option<O>> {
124 let mut stats = MonitoredStateStoreGetStats::new(table_id, self.storage_metrics.clone());
125
126 let value = on_key_value_future
127 .instrument_await("store_on_key_value".verbose())
128 .instrument(tracing::trace_span!("store_on_key_value"))
129 .await
130 .inspect_err(|e| error!(error = %e.as_report(), "Failed in get"))?;
131
132 stats.get_key_size = key_len;
133 let value = value.map(|(value, value_len)| {
134 stats.get_value_size = value_len;
135 value
136 });
137 stats.report();
138
139 Ok(value)
140 }
141}
142
143impl<S: StateStoreGet> StateStoreGet for MonitoredTableStateStore<S> {
144 fn on_key_value<'a, O: Send + 'a>(
145 &'a self,
146 key: TableKey<Bytes>,
147 read_options: ReadOptions,
148 on_key_value_fn: impl KeyValueFn<'a, O>,
149 ) -> impl StorageFuture<'a, Option<O>> {
150 let table_id = self.table_id();
151 let key_len = key.len();
152 self.monitored_on_key_value(
153 self.inner
154 .on_key_value(key, read_options, move |key, value| {
155 let result = on_key_value_fn(key, value);
156 result.map(|output| (output, value.len()))
157 }),
158 table_id,
159 key_len,
160 )
161 }
162}
163
164impl<S: StateStoreRead> StateStoreRead for MonitoredTableStateStore<S> {
165 type Iter = impl StateStoreReadIter;
166 type RevIter = impl StateStoreReadIter;
167
168 fn iter(
169 &self,
170 key_range: TableKeyRange,
171 read_options: ReadOptions,
172 ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
173 self.monitored_iter::<'_, _, _, StateStoreIterStats>(
174 self.table_id(),
175 self.inner.iter(key_range, read_options),
176 )
177 }
178
179 fn rev_iter(
180 &self,
181 key_range: TableKeyRange,
182 read_options: ReadOptions,
183 ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
184 self.monitored_iter::<'_, _, _, StateStoreIterStats>(
185 self.table_id(),
186 self.inner.rev_iter(key_range, read_options),
187 )
188 }
189}
190
191impl<S: StateStoreReadLog> StateStoreReadLog for MonitoredStateStore<S> {
192 type ChangeLogIter = impl StateStoreReadChangeLogIter;
193
194 fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> impl StorageFuture<'_, u64> {
195 self.inner.next_epoch(epoch, options)
196 }
197
198 fn iter_log(
199 &self,
200 epoch_range: (u64, u64),
201 key_range: TableKeyRange,
202 options: ReadLogOptions,
203 ) -> impl Future<Output = StorageResult<Self::ChangeLogIter>> + Send + '_ {
204 self.monitored_iter::<'_, _, _, StateStoreIterLogStats>(
205 options.table_id,
206 self.inner.iter_log(epoch_range, key_range, options),
207 )
208 }
209}
210
211impl<S: StateStoreReadVector> StateStoreReadVector for MonitoredTableStateStore<S> {
212 fn nearest<'a, O: Send + 'a>(
213 &'a self,
214 vec: VectorRef<'a>,
215 options: VectorNearestOptions,
216 on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
217 ) -> impl StorageFuture<'a, Vec<O>> {
218 thread_local! {
219 static THREAD_HISTOGRAM_VEC: RefCell<HashMap<(TableId, usize, usize), LabelGuardedHistogram>> = RefCell::new(HashMap::new());
220 }
221 let start_time = Instant::now();
222 let metric_key = (self.table_id(), options.top_n, options.hnsw_ef_search);
223
224 self.inner
225 .nearest(vec, options, on_nearest_item_fn)
226 .inspect_ok(move |_| {
227 THREAD_HISTOGRAM_VEC.with_borrow_mut(|map| {
228 map.entry(metric_key)
229 .or_insert_with(|| {
230 let (table_id, top_n, ef) = metric_key;
231 let labels = [table_id.to_string(), top_n.to_string(), ef.to_string()];
232 self.storage_metrics
233 .vector_nearest_duration
234 .with_guarded_label_values(&labels.each_ref().map(|s| s.as_str()))
235 })
236 .observe(start_time.elapsed().as_secs_f64());
237 });
238 })
239 }
240}
241
242impl<S: LocalStateStore> LocalStateStore for MonitoredTableStateStore<S> {
243 type FlushedSnapshotReader = MonitoredTableStateStore<S::FlushedSnapshotReader>;
244
245 type Iter<'a> = impl StateStoreIter + 'a;
246 type RevIter<'a> = impl StateStoreIter + 'a;
247
248 fn iter(
249 &self,
250 key_range: TableKeyRange,
251 read_options: ReadOptions,
252 ) -> impl Future<Output = StorageResult<Self::Iter<'_>>> + Send + '_ {
253 self.monitored_iter::<'_, _, _, StateStoreIterStats>(
254 self.table_id(),
255 self.inner.iter(key_range, read_options),
256 )
257 }
258
259 fn rev_iter(
260 &self,
261 key_range: TableKeyRange,
262 read_options: ReadOptions,
263 ) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_ {
264 self.monitored_iter::<'_, _, _, StateStoreIterStats>(
265 self.table_id(),
266 self.inner.rev_iter(key_range, read_options),
267 )
268 }
269
270 fn insert(
271 &mut self,
272 key: TableKey<Bytes>,
273 new_val: Bytes,
274 old_val: Option<Bytes>,
275 ) -> StorageResult<()> {
276 self.inner.insert(key, new_val, old_val)
278 }
279
280 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
281 self.inner.delete(key, old_val)
283 }
284
285 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
286 self.inner.get_table_watermark(vnode)
287 }
288
289 fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
290 MonitoredTableStateStore::new(
291 self.inner.new_flushed_snapshot_reader(),
292 self.storage_metrics.clone(),
293 self.table_id(),
294 )
295 }
296
297 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
298 self.inner.update_vnode_bitmap(vnodes).await
299 }
300}
301
302impl<S: StateStoreWriteEpochControl> StateStoreWriteEpochControl for MonitoredTableStateStore<S> {
303 fn flush(&mut self) -> impl Future<Output = StorageResult<usize>> + Send + '_ {
304 self.inner.flush().instrument_await("store_flush".verbose())
305 }
306
307 async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
308 self.inner.init(options).await
309 }
310
311 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
312 self.inner.seal_current_epoch(next_epoch, opts)
314 }
315
316 fn try_flush(&mut self) -> impl Future<Output = StorageResult<()>> + Send + '_ {
317 self.inner
318 .try_flush()
319 .instrument_await("store_try_flush".verbose())
320 }
321}
322
323impl<S: StateStoreWriteVector> StateStoreWriteVector for MonitoredTableStateStore<S> {
324 fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> StorageResult<()> {
325 self.inner.insert(vec, info)
327 }
328}
329
330impl<S: StateStore> StateStore for MonitoredStateStore<S> {
331 type Local = MonitoredTableStateStore<S::Local>;
332 type ReadSnapshot = MonitoredTableStateStore<S::ReadSnapshot>;
333 type VectorWriter = MonitoredTableStateStore<S::VectorWriter>;
334
335 fn try_wait_epoch(
336 &self,
337 epoch: HummockReadEpoch,
338 options: TryWaitEpochOptions,
339 ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
340 self.inner
341 .try_wait_epoch(epoch, options)
342 .instrument_await("store_wait_epoch".verbose())
343 .inspect_err(|e| error!(error = %e.as_report(), "Failed in wait_epoch"))
344 }
345
346 fn monitored(
347 self,
348 _storage_metrics: Arc<MonitoredStorageMetrics>,
349 ) -> MonitoredStateStore<Self> {
350 panic!("the state store is already monitored")
351 }
352
353 async fn new_local(&self, option: NewLocalOptions) -> Self::Local {
354 let table_id = option.table_id;
355 MonitoredTableStateStore::new(
356 self.inner
357 .new_local(option)
358 .instrument_await("store_new_local")
359 .await,
360 self.storage_metrics.clone(),
361 table_id,
362 )
363 }
364
365 async fn new_read_snapshot(
366 &self,
367 epoch: HummockReadEpoch,
368 options: NewReadSnapshotOptions,
369 ) -> StorageResult<Self::ReadSnapshot> {
370 Ok(MonitoredTableStateStore::new(
371 self.inner.new_read_snapshot(epoch, options).await?,
372 self.storage_metrics.clone(),
373 options.table_id,
374 ))
375 }
376
377 async fn new_vector_writer(&self, options: NewVectorWriterOptions) -> Self::VectorWriter {
378 let table_id = options.table_id;
379 MonitoredTableStateStore::new(
380 self.inner.new_vector_writer(options).await,
381 self.storage_metrics.clone(),
382 table_id,
383 )
384 }
385}
386
387impl MonitoredStateStore<HummockStorage> {
388 pub fn sstable_store(&self) -> SstableStoreRef {
389 self.inner.sstable_store()
390 }
391
392 pub fn object_id_manager(&self) -> ObjectIdManagerRef {
393 self.inner.object_id_manager().clone()
394 }
395}
396
397impl<S: AsHummock> AsHummock for MonitoredStateStore<S> {
398 fn as_hummock(&self) -> Option<&HummockStorage> {
399 self.inner.as_hummock()
400 }
401
402 fn sync(
403 &self,
404 sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
405 ) -> BoxFuture<'_, StorageResult<SyncResult>> {
406 async move {
407 let future = self
408 .inner
409 .sync(sync_table_epochs)
410 .instrument_await("store_sync");
411 let timer = self.storage_metrics.sync_duration.start_timer();
412 let sync_size = self.storage_metrics.sync_size.clone();
413 let sync_result = future
414 .await
415 .inspect_err(|e| error!(error = %e.as_report(), "Failed in sync"))?;
416 timer.observe_duration();
417 if sync_result.sync_size != 0 {
418 sync_size.observe(sync_result.sync_size as _);
419 }
420 Ok(sync_result)
421 }
422 .boxed()
423 }
424}
425
426pub(crate) struct MonitoredStateStoreIter<
428 Item: IterItem,
429 I,
430 S: StateStoreIterStatsTrait<Item = Item>,
431> {
432 inner: I,
433 stats: MonitoredStateStoreIterStats<S>,
434 _phantom: PhantomData<Item>,
435}
436
437impl<Item: IterItem, I: StateStoreIter<Item>, S: StateStoreIterStatsTrait<Item = Item>>
438 StateStoreIter<Item> for MonitoredStateStoreIter<Item, I, S>
439{
440 async fn try_next(&mut self) -> StorageResult<Option<Item::ItemRef<'_>>> {
441 if let Some(item) = self
442 .inner
443 .try_next()
444 .instrument(tracing::trace_span!("store_iter_try_next"))
445 .await
446 .inspect_err(|e| error!(error = %e.as_report(), "Failed in next"))?
447 {
448 self.stats.inner.observe(item);
449 Ok(Some(item))
450 } else {
451 Ok(None)
452 }
453 }
454}