risingwave_storage/monitor/
monitored_store.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cell::RefCell;
16use std::collections::{HashMap, HashSet};
17use std::marker::PhantomData;
18use std::sync::Arc;
19
20use await_tree::{InstrumentAwait, SpanExt};
21use bytes::Bytes;
22use futures::future::BoxFuture;
23use futures::{Future, FutureExt, TryFutureExt};
24use risingwave_common::bitmap::Bitmap;
25use risingwave_common::catalog::TableId;
26use risingwave_common::hash::VirtualNode;
27use risingwave_common::metrics::LabelGuardedHistogram;
28use risingwave_common::types::VectorRef;
29use risingwave_hummock_sdk::key::{TableKey, TableKeyRange};
30use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch, SyncResult};
31use thiserror_ext::AsReport;
32use tokio::time::Instant;
33use tracing::{Instrument, error};
34
35use super::{MonitoredStateStoreGetStats, MonitoredStateStoreIterStats, MonitoredStorageMetrics};
36use crate::error::StorageResult;
37use crate::hummock::sstable_store::SstableStoreRef;
38use crate::hummock::{HummockStorage, ObjectIdManagerRef};
39use crate::monitor::monitored_storage_metrics::StateStoreIterStats;
40use crate::monitor::{StateStoreIterLogStats, StateStoreIterStatsTrait};
41use crate::store::*;
42use crate::store_impl::AsHummock;
43
44/// A state store wrapper for monitoring metrics.
45#[derive(Clone)]
46pub struct MonitoredStateStore<S, E = ()> {
47    inner: Box<S>,
48    storage_metrics: Arc<MonitoredStorageMetrics>,
49    extra: E,
50}
51
52type MonitoredTableStateStore<S> = MonitoredStateStore<S, TableId>;
53
54impl<S> MonitoredStateStore<S> {
55    pub fn new(inner: S, storage_metrics: Arc<MonitoredStorageMetrics>) -> Self {
56        Self {
57            inner: Box::new(inner),
58            storage_metrics,
59            extra: (),
60        }
61    }
62
63    pub fn inner(&self) -> &S {
64        &self.inner
65    }
66}
67
68impl<S> MonitoredTableStateStore<S> {
69    fn new(inner: S, storage_metrics: Arc<MonitoredStorageMetrics>, table_id: TableId) -> Self {
70        Self {
71            inner: Box::new(inner),
72            storage_metrics,
73            extra: table_id,
74        }
75    }
76
77    fn table_id(&self) -> TableId {
78        self.extra
79    }
80}
81
82// Note: it is important to define the `MonitoredStateStoreIter` type alias, as it marks that
83// the return type of `monitored_iter` only captures the lifetime `'s` and has nothing to do with
84// `'a`. If we simply use `impl StateStoreIter + 's`, the rust compiler will also capture
85// the lifetime `'a` in the scope defined in the scope.
86impl<S, E> MonitoredStateStore<S, E> {
87    async fn monitored_iter<
88        'a,
89        Item: IterItem,
90        I: StateStoreIter<Item> + 'a,
91        Stat: StateStoreIterStatsTrait<Item = Item>,
92    >(
93        &'a self,
94        table_id: TableId,
95        iter_stream_future: impl Future<Output = StorageResult<I>> + 'a,
96    ) -> StorageResult<MonitoredStateStoreIter<Item, I, Stat>> {
97        // start time takes iterator build time into account
98        // wait for iterator creation (e.g. seek)
99        let start_time = Instant::now();
100        let iter_stream = iter_stream_future
101            .await
102            .inspect_err(|e| error!(error = %e.as_report(), "Failed in iter"))?;
103        let iter_init_duration = start_time.elapsed();
104
105        // create a monitored iterator to collect metrics
106        let monitored = MonitoredStateStoreIter {
107            inner: iter_stream,
108            stats: MonitoredStateStoreIterStats {
109                inner: Stat::new(table_id, &self.storage_metrics, iter_init_duration),
110                table_id,
111                metrics: self.storage_metrics.clone(),
112            },
113            _phantom: PhantomData,
114        };
115        Ok(monitored)
116    }
117
118    async fn monitored_on_key_value<O>(
119        &self,
120        on_key_value_future: impl Future<Output = StorageResult<Option<(O, usize)>>>,
121        table_id: TableId,
122        key_len: usize,
123    ) -> StorageResult<Option<O>> {
124        let mut stats = MonitoredStateStoreGetStats::new(table_id, self.storage_metrics.clone());
125
126        let value = on_key_value_future
127            .instrument_await("store_on_key_value".verbose())
128            .instrument(tracing::trace_span!("store_on_key_value"))
129            .await
130            .inspect_err(|e| error!(error = %e.as_report(), "Failed in get"))?;
131
132        stats.get_key_size = key_len;
133        let value = value.map(|(value, value_len)| {
134            stats.get_value_size = value_len;
135            value
136        });
137        stats.report();
138
139        Ok(value)
140    }
141}
142
143impl<S: StateStoreGet> StateStoreGet for MonitoredTableStateStore<S> {
144    fn on_key_value<'a, O: Send + 'a>(
145        &'a self,
146        key: TableKey<Bytes>,
147        read_options: ReadOptions,
148        on_key_value_fn: impl KeyValueFn<'a, O>,
149    ) -> impl StorageFuture<'a, Option<O>> {
150        let table_id = self.table_id();
151        let key_len = key.len();
152        self.monitored_on_key_value(
153            self.inner
154                .on_key_value(key, read_options, move |key, value| {
155                    let result = on_key_value_fn(key, value);
156                    result.map(|output| (output, value.len()))
157                }),
158            table_id,
159            key_len,
160        )
161    }
162}
163
164impl<S: StateStoreRead> StateStoreRead for MonitoredTableStateStore<S> {
165    type Iter = impl StateStoreReadIter;
166    type RevIter = impl StateStoreReadIter;
167
168    fn iter(
169        &self,
170        key_range: TableKeyRange,
171        read_options: ReadOptions,
172    ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
173        self.monitored_iter::<'_, _, _, StateStoreIterStats>(
174            self.table_id(),
175            self.inner.iter(key_range, read_options),
176        )
177    }
178
179    fn rev_iter(
180        &self,
181        key_range: TableKeyRange,
182        read_options: ReadOptions,
183    ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
184        self.monitored_iter::<'_, _, _, StateStoreIterStats>(
185            self.table_id(),
186            self.inner.rev_iter(key_range, read_options),
187        )
188    }
189}
190
191impl<S: StateStoreReadLog> StateStoreReadLog for MonitoredStateStore<S> {
192    type ChangeLogIter = impl StateStoreReadChangeLogIter;
193
194    fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> impl StorageFuture<'_, u64> {
195        self.inner.next_epoch(epoch, options)
196    }
197
198    fn iter_log(
199        &self,
200        epoch_range: (u64, u64),
201        key_range: TableKeyRange,
202        options: ReadLogOptions,
203    ) -> impl Future<Output = StorageResult<Self::ChangeLogIter>> + Send + '_ {
204        self.monitored_iter::<'_, _, _, StateStoreIterLogStats>(
205            options.table_id,
206            self.inner.iter_log(epoch_range, key_range, options),
207        )
208    }
209}
210
211impl<S: StateStoreReadVector> StateStoreReadVector for MonitoredTableStateStore<S> {
212    fn nearest<'a, O: Send + 'a>(
213        &'a self,
214        vec: VectorRef<'a>,
215        options: VectorNearestOptions,
216        on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
217    ) -> impl StorageFuture<'a, Vec<O>> {
218        thread_local! {
219            static THREAD_HISTOGRAM_VEC: RefCell<HashMap<(TableId, usize, usize), LabelGuardedHistogram>> = RefCell::new(HashMap::new());
220        }
221        let start_time = Instant::now();
222        let metric_key = (self.table_id(), options.top_n, options.hnsw_ef_search);
223
224        self.inner
225            .nearest(vec, options, on_nearest_item_fn)
226            .inspect_ok(move |_| {
227                THREAD_HISTOGRAM_VEC.with_borrow_mut(|map| {
228                    map.entry(metric_key)
229                        .or_insert_with(|| {
230                            let (table_id, top_n, ef) = metric_key;
231                            let labels = [table_id.to_string(), top_n.to_string(), ef.to_string()];
232                            self.storage_metrics
233                                .vector_nearest_duration
234                                .with_guarded_label_values(&labels.each_ref().map(|s| s.as_str()))
235                        })
236                        .observe(start_time.elapsed().as_secs_f64());
237                });
238            })
239    }
240}
241
242impl<S: LocalStateStore> LocalStateStore for MonitoredTableStateStore<S> {
243    type FlushedSnapshotReader = MonitoredTableStateStore<S::FlushedSnapshotReader>;
244
245    type Iter<'a> = impl StateStoreIter + 'a;
246    type RevIter<'a> = impl StateStoreIter + 'a;
247
248    fn iter(
249        &self,
250        key_range: TableKeyRange,
251        read_options: ReadOptions,
252    ) -> impl Future<Output = StorageResult<Self::Iter<'_>>> + Send + '_ {
253        self.monitored_iter::<'_, _, _, StateStoreIterStats>(
254            self.table_id(),
255            self.inner.iter(key_range, read_options),
256        )
257    }
258
259    fn rev_iter(
260        &self,
261        key_range: TableKeyRange,
262        read_options: ReadOptions,
263    ) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_ {
264        self.monitored_iter::<'_, _, _, StateStoreIterStats>(
265            self.table_id(),
266            self.inner.rev_iter(key_range, read_options),
267        )
268    }
269
270    fn insert(
271        &mut self,
272        key: TableKey<Bytes>,
273        new_val: Bytes,
274        old_val: Option<Bytes>,
275    ) -> StorageResult<()> {
276        // TODO: collect metrics
277        self.inner.insert(key, new_val, old_val)
278    }
279
280    fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
281        // TODO: collect metrics
282        self.inner.delete(key, old_val)
283    }
284
285    fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
286        self.inner.get_table_watermark(vnode)
287    }
288
289    fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
290        MonitoredTableStateStore::new(
291            self.inner.new_flushed_snapshot_reader(),
292            self.storage_metrics.clone(),
293            self.table_id(),
294        )
295    }
296
297    async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
298        self.inner.update_vnode_bitmap(vnodes).await
299    }
300}
301
302impl<S: StateStoreWriteEpochControl> StateStoreWriteEpochControl for MonitoredTableStateStore<S> {
303    fn flush(&mut self) -> impl Future<Output = StorageResult<usize>> + Send + '_ {
304        self.inner.flush().instrument_await("store_flush".verbose())
305    }
306
307    async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
308        self.inner.init(options).await
309    }
310
311    fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
312        // TODO: may collect metrics
313        self.inner.seal_current_epoch(next_epoch, opts)
314    }
315
316    fn try_flush(&mut self) -> impl Future<Output = StorageResult<()>> + Send + '_ {
317        self.inner
318            .try_flush()
319            .instrument_await("store_try_flush".verbose())
320    }
321}
322
323impl<S: StateStoreWriteVector> StateStoreWriteVector for MonitoredTableStateStore<S> {
324    fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> StorageResult<()> {
325        // TODO: monitor
326        self.inner.insert(vec, info)
327    }
328}
329
330impl<S: StateStore> StateStore for MonitoredStateStore<S> {
331    type Local = MonitoredTableStateStore<S::Local>;
332    type ReadSnapshot = MonitoredTableStateStore<S::ReadSnapshot>;
333    type VectorWriter = MonitoredTableStateStore<S::VectorWriter>;
334
335    fn try_wait_epoch(
336        &self,
337        epoch: HummockReadEpoch,
338        options: TryWaitEpochOptions,
339    ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
340        self.inner
341            .try_wait_epoch(epoch, options)
342            .instrument_await("store_wait_epoch".verbose())
343            .inspect_err(|e| error!(error = %e.as_report(), "Failed in wait_epoch"))
344    }
345
346    fn monitored(
347        self,
348        _storage_metrics: Arc<MonitoredStorageMetrics>,
349    ) -> MonitoredStateStore<Self> {
350        panic!("the state store is already monitored")
351    }
352
353    async fn new_local(&self, option: NewLocalOptions) -> Self::Local {
354        let table_id = option.table_id;
355        MonitoredTableStateStore::new(
356            self.inner
357                .new_local(option)
358                .instrument_await("store_new_local")
359                .await,
360            self.storage_metrics.clone(),
361            table_id,
362        )
363    }
364
365    async fn new_read_snapshot(
366        &self,
367        epoch: HummockReadEpoch,
368        options: NewReadSnapshotOptions,
369    ) -> StorageResult<Self::ReadSnapshot> {
370        Ok(MonitoredTableStateStore::new(
371            self.inner.new_read_snapshot(epoch, options).await?,
372            self.storage_metrics.clone(),
373            options.table_id,
374        ))
375    }
376
377    async fn new_vector_writer(&self, options: NewVectorWriterOptions) -> Self::VectorWriter {
378        let table_id = options.table_id;
379        MonitoredTableStateStore::new(
380            self.inner.new_vector_writer(options).await,
381            self.storage_metrics.clone(),
382            table_id,
383        )
384    }
385}
386
387impl MonitoredStateStore<HummockStorage> {
388    pub fn sstable_store(&self) -> SstableStoreRef {
389        self.inner.sstable_store()
390    }
391
392    pub fn object_id_manager(&self) -> ObjectIdManagerRef {
393        self.inner.object_id_manager().clone()
394    }
395}
396
397impl<S: AsHummock> AsHummock for MonitoredStateStore<S> {
398    fn as_hummock(&self) -> Option<&HummockStorage> {
399        self.inner.as_hummock()
400    }
401
402    fn sync(
403        &self,
404        sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
405    ) -> BoxFuture<'_, StorageResult<SyncResult>> {
406        async move {
407            let future = self
408                .inner
409                .sync(sync_table_epochs)
410                .instrument_await("store_sync");
411            let timer = self.storage_metrics.sync_duration.start_timer();
412            let sync_size = self.storage_metrics.sync_size.clone();
413            let sync_result = future
414                .await
415                .inspect_err(|e| error!(error = %e.as_report(), "Failed in sync"))?;
416            timer.observe_duration();
417            if sync_result.sync_size != 0 {
418                sync_size.observe(sync_result.sync_size as _);
419            }
420            Ok(sync_result)
421        }
422        .boxed()
423    }
424}
425
426/// A state store iterator wrapper for monitoring metrics.
427pub(crate) struct MonitoredStateStoreIter<
428    Item: IterItem,
429    I,
430    S: StateStoreIterStatsTrait<Item = Item>,
431> {
432    inner: I,
433    stats: MonitoredStateStoreIterStats<S>,
434    _phantom: PhantomData<Item>,
435}
436
437impl<Item: IterItem, I: StateStoreIter<Item>, S: StateStoreIterStatsTrait<Item = Item>>
438    StateStoreIter<Item> for MonitoredStateStoreIter<Item, I, S>
439{
440    async fn try_next(&mut self) -> StorageResult<Option<Item::ItemRef<'_>>> {
441        if let Some(item) = self
442            .inner
443            .try_next()
444            .instrument(tracing::trace_span!("store_iter_try_next"))
445            .await
446            .inspect_err(|e| error!(error = %e.as_report(), "Failed in next"))?
447        {
448            self.stats.inner.observe(item);
449            Ok(Some(item))
450        } else {
451            Ok(None)
452        }
453    }
454}