risingwave_storage/monitor/
monitored_store.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::marker::PhantomData;
17use std::sync::Arc;
18
19use await_tree::{InstrumentAwait, SpanExt};
20use bytes::Bytes;
21use futures::future::BoxFuture;
22use futures::{Future, FutureExt, TryFutureExt};
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::TableId;
25use risingwave_common::hash::VirtualNode;
26use risingwave_hummock_sdk::key::{TableKey, TableKeyRange};
27use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch, SyncResult};
28use thiserror_ext::AsReport;
29use tokio::time::Instant;
30use tracing::{Instrument, error};
31
32use super::{MonitoredStateStoreGetStats, MonitoredStateStoreIterStats, MonitoredStorageMetrics};
33use crate::error::StorageResult;
34use crate::hummock::sstable_store::SstableStoreRef;
35use crate::hummock::{HummockStorage, SstableObjectIdManagerRef};
36use crate::monitor::monitored_storage_metrics::StateStoreIterStats;
37use crate::monitor::{StateStoreIterLogStats, StateStoreIterStatsTrait};
38use crate::store::*;
39use crate::store_impl::AsHummock;
40
41/// A state store wrapper for monitoring metrics.
42#[derive(Clone)]
43pub struct MonitoredStateStore<S> {
44    inner: Box<S>,
45
46    storage_metrics: Arc<MonitoredStorageMetrics>,
47}
48
49impl<S> MonitoredStateStore<S> {
50    pub fn new(inner: S, storage_metrics: Arc<MonitoredStorageMetrics>) -> Self {
51        Self {
52            inner: Box::new(inner),
53            storage_metrics,
54        }
55    }
56}
57
58// Note: it is important to define the `MonitoredStateStoreIter` type alias, as it marks that
59// the return type of `monitored_iter` only captures the lifetime `'s` and has nothing to do with
60// `'a`. If we simply use `impl StateStoreIter + 's`, the rust compiler will also capture
61// the lifetime `'a` in the scope defined in the scope.
62impl<S> MonitoredStateStore<S> {
63    async fn monitored_iter<
64        'a,
65        Item: IterItem,
66        I: StateStoreIter<Item> + 'a,
67        Stat: StateStoreIterStatsTrait<Item = Item>,
68    >(
69        &'a self,
70        table_id: TableId,
71        iter_stream_future: impl Future<Output = StorageResult<I>> + 'a,
72    ) -> StorageResult<MonitoredStateStoreIter<Item, I, Stat>> {
73        // start time takes iterator build time into account
74        // wait for iterator creation (e.g. seek)
75        let start_time = Instant::now();
76        let iter_stream = iter_stream_future
77            .await
78            .inspect_err(|e| error!(error = %e.as_report(), "Failed in iter"))?;
79        let iter_init_duration = start_time.elapsed();
80
81        // create a monitored iterator to collect metrics
82        let monitored = MonitoredStateStoreIter {
83            inner: iter_stream,
84            stats: MonitoredStateStoreIterStats {
85                inner: Stat::new(table_id.table_id, &self.storage_metrics, iter_init_duration),
86                table_id: table_id.table_id,
87                metrics: self.storage_metrics.clone(),
88            },
89            _phantom: PhantomData,
90        };
91        Ok(monitored)
92    }
93
94    pub fn inner(&self) -> &S {
95        &self.inner
96    }
97
98    async fn monitored_get(
99        &self,
100        get_future: impl Future<Output = StorageResult<Option<Bytes>>>,
101        table_id: TableId,
102        key_len: usize,
103    ) -> StorageResult<Option<Bytes>> {
104        let mut stats =
105            MonitoredStateStoreGetStats::new(table_id.table_id, self.storage_metrics.clone());
106
107        let value = get_future
108            .instrument_await("store_get".verbose())
109            .instrument(tracing::trace_span!("store_get"))
110            .await
111            .inspect_err(|e| error!(error = %e.as_report(), "Failed in get"))?;
112
113        stats.get_key_size = key_len;
114        if let Some(value) = value.as_ref() {
115            stats.get_value_size = value.len();
116        }
117        stats.report();
118
119        Ok(value)
120    }
121
122    async fn monitored_get_keyed_row(
123        &self,
124        get_keyed_row_future: impl Future<Output = StorageResult<Option<StateStoreKeyedRow>>>,
125        table_id: TableId,
126        key_len: usize,
127    ) -> StorageResult<Option<StateStoreKeyedRow>> {
128        let mut stats =
129            MonitoredStateStoreGetStats::new(table_id.table_id, self.storage_metrics.clone());
130
131        let value = get_keyed_row_future
132            .instrument_await("store_get_keyed_row".verbose())
133            .instrument(tracing::trace_span!("store_get_keyed_row"))
134            .await
135            .inspect_err(|e| error!(error = %e.as_report(), "Failed in get"))?;
136
137        stats.get_key_size = key_len;
138        if let Some((_, value)) = value.as_ref() {
139            stats.get_value_size = value.len();
140        }
141        stats.report();
142
143        Ok(value)
144    }
145}
146
147impl<S: StateStoreRead> StateStoreRead for MonitoredStateStore<S> {
148    type Iter = impl StateStoreReadIter;
149    type RevIter = impl StateStoreReadIter;
150
151    fn get_keyed_row(
152        &self,
153        key: TableKey<Bytes>,
154        read_options: ReadOptions,
155    ) -> impl Future<Output = StorageResult<Option<StateStoreKeyedRow>>> + '_ {
156        let table_id = read_options.table_id;
157        let key_len = key.len();
158        self.monitored_get_keyed_row(
159            self.inner.get_keyed_row(key, read_options),
160            table_id,
161            key_len,
162        )
163    }
164
165    fn iter(
166        &self,
167        key_range: TableKeyRange,
168        read_options: ReadOptions,
169    ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
170        self.monitored_iter::<'_, _, _, StateStoreIterStats>(
171            read_options.table_id,
172            self.inner.iter(key_range, read_options),
173        )
174    }
175
176    fn rev_iter(
177        &self,
178        key_range: TableKeyRange,
179        read_options: ReadOptions,
180    ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
181        self.monitored_iter::<'_, _, _, StateStoreIterStats>(
182            read_options.table_id,
183            self.inner.rev_iter(key_range, read_options),
184        )
185    }
186}
187
188impl<S: StateStoreReadLog> StateStoreReadLog for MonitoredStateStore<S> {
189    type ChangeLogIter = impl StateStoreReadChangeLogIter;
190
191    fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> impl StorageFuture<'_, u64> {
192        self.inner.next_epoch(epoch, options)
193    }
194
195    fn iter_log(
196        &self,
197        epoch_range: (u64, u64),
198        key_range: TableKeyRange,
199        options: ReadLogOptions,
200    ) -> impl Future<Output = StorageResult<Self::ChangeLogIter>> + Send + '_ {
201        self.monitored_iter::<'_, _, _, StateStoreIterLogStats>(
202            options.table_id,
203            self.inner.iter_log(epoch_range, key_range, options),
204        )
205    }
206}
207
208impl<S: LocalStateStore> LocalStateStore for MonitoredStateStore<S> {
209    type FlushedSnapshotReader = MonitoredStateStore<S::FlushedSnapshotReader>;
210
211    type Iter<'a> = impl StateStoreIter + 'a;
212    type RevIter<'a> = impl StateStoreIter + 'a;
213
214    fn get(
215        &self,
216        key: TableKey<Bytes>,
217        read_options: ReadOptions,
218    ) -> impl Future<Output = StorageResult<Option<Bytes>>> + Send + '_ {
219        let table_id = read_options.table_id;
220        let key_len = key.len();
221        // TODO: may collect the metrics as local
222        self.monitored_get(self.inner.get(key, read_options), table_id, key_len)
223    }
224
225    fn iter(
226        &self,
227        key_range: TableKeyRange,
228        read_options: ReadOptions,
229    ) -> impl Future<Output = StorageResult<Self::Iter<'_>>> + Send + '_ {
230        let table_id = read_options.table_id;
231        self.monitored_iter::<'_, _, _, StateStoreIterStats>(
232            table_id,
233            self.inner.iter(key_range, read_options),
234        )
235    }
236
237    fn rev_iter(
238        &self,
239        key_range: TableKeyRange,
240        read_options: ReadOptions,
241    ) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_ {
242        let table_id = read_options.table_id;
243        self.monitored_iter::<'_, _, _, StateStoreIterStats>(
244            table_id,
245            self.inner.rev_iter(key_range, read_options),
246        )
247    }
248
249    fn insert(
250        &mut self,
251        key: TableKey<Bytes>,
252        new_val: Bytes,
253        old_val: Option<Bytes>,
254    ) -> StorageResult<()> {
255        // TODO: collect metrics
256        self.inner.insert(key, new_val, old_val)
257    }
258
259    fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
260        // TODO: collect metrics
261        self.inner.delete(key, old_val)
262    }
263
264    fn flush(&mut self) -> impl Future<Output = StorageResult<usize>> + Send + '_ {
265        self.inner.flush().instrument_await("store_flush".verbose())
266    }
267
268    fn epoch(&self) -> u64 {
269        self.inner.epoch()
270    }
271
272    fn is_dirty(&self) -> bool {
273        self.inner.is_dirty()
274    }
275
276    async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
277        self.inner.init(options).await
278    }
279
280    fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
281        // TODO: may collect metrics
282        self.inner.seal_current_epoch(next_epoch, opts)
283    }
284
285    fn try_flush(&mut self) -> impl Future<Output = StorageResult<()>> + Send + '_ {
286        self.inner
287            .try_flush()
288            .instrument_await("store_try_flush".verbose())
289    }
290
291    async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
292        self.inner.update_vnode_bitmap(vnodes).await
293    }
294
295    fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
296        self.inner.get_table_watermark(vnode)
297    }
298
299    fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
300        MonitoredStateStore::new(
301            self.inner.new_flushed_snapshot_reader(),
302            self.storage_metrics.clone(),
303        )
304    }
305}
306
307impl<S: StateStore> StateStore for MonitoredStateStore<S> {
308    type Local = MonitoredStateStore<S::Local>;
309    type ReadSnapshot = MonitoredStateStore<S::ReadSnapshot>;
310
311    fn try_wait_epoch(
312        &self,
313        epoch: HummockReadEpoch,
314        options: TryWaitEpochOptions,
315    ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
316        self.inner
317            .try_wait_epoch(epoch, options)
318            .instrument_await("store_wait_epoch".verbose())
319            .inspect_err(|e| error!(error = %e.as_report(), "Failed in wait_epoch"))
320    }
321
322    fn monitored(
323        self,
324        _storage_metrics: Arc<MonitoredStorageMetrics>,
325    ) -> MonitoredStateStore<Self> {
326        panic!("the state store is already monitored")
327    }
328
329    async fn new_local(&self, option: NewLocalOptions) -> Self::Local {
330        MonitoredStateStore::new(
331            self.inner
332                .new_local(option)
333                .instrument_await("store_new_local")
334                .await,
335            self.storage_metrics.clone(),
336        )
337    }
338
339    async fn new_read_snapshot(
340        &self,
341        epoch: HummockReadEpoch,
342        options: NewReadSnapshotOptions,
343    ) -> StorageResult<Self::ReadSnapshot> {
344        Ok(MonitoredStateStore::new(
345            self.inner.new_read_snapshot(epoch, options).await?,
346            self.storage_metrics.clone(),
347        ))
348    }
349}
350
351impl MonitoredStateStore<HummockStorage> {
352    pub fn sstable_store(&self) -> SstableStoreRef {
353        self.inner.sstable_store()
354    }
355
356    pub fn sstable_object_id_manager(&self) -> SstableObjectIdManagerRef {
357        self.inner.sstable_object_id_manager().clone()
358    }
359}
360
361impl<S: AsHummock> AsHummock for MonitoredStateStore<S> {
362    fn as_hummock(&self) -> Option<&HummockStorage> {
363        self.inner.as_hummock()
364    }
365
366    fn sync(
367        &self,
368        sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
369    ) -> BoxFuture<'_, StorageResult<SyncResult>> {
370        async move {
371            let future = self
372                .inner
373                .sync(sync_table_epochs)
374                .instrument_await("store_sync");
375            let timer = self.storage_metrics.sync_duration.start_timer();
376            let sync_size = self.storage_metrics.sync_size.clone();
377            let sync_result = future
378                .await
379                .inspect_err(|e| error!(error = %e.as_report(), "Failed in sync"))?;
380            timer.observe_duration();
381            if sync_result.sync_size != 0 {
382                sync_size.observe(sync_result.sync_size as _);
383            }
384            Ok(sync_result)
385        }
386        .boxed()
387    }
388}
389
390/// A state store iterator wrapper for monitoring metrics.
391pub(crate) struct MonitoredStateStoreIter<
392    Item: IterItem,
393    I,
394    S: StateStoreIterStatsTrait<Item = Item>,
395> {
396    inner: I,
397    stats: MonitoredStateStoreIterStats<S>,
398    _phantom: PhantomData<Item>,
399}
400
401impl<Item: IterItem, I: StateStoreIter<Item>, S: StateStoreIterStatsTrait<Item = Item>>
402    StateStoreIter<Item> for MonitoredStateStoreIter<Item, I, S>
403{
404    async fn try_next(&mut self) -> StorageResult<Option<Item::ItemRef<'_>>> {
405        if let Some(item) = self
406            .inner
407            .try_next()
408            .instrument(tracing::trace_span!("store_iter_try_next"))
409            .await
410            .inspect_err(|e| error!(error = %e.as_report(), "Failed in next"))?
411        {
412            self.stats.inner.observe(item);
413            Ok(Some(item))
414        } else {
415            Ok(None)
416        }
417    }
418}