risingwave_storage/monitor/
monitored_store.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::marker::PhantomData;
17use std::sync::Arc;
18
19use await_tree::{InstrumentAwait, SpanExt};
20use bytes::Bytes;
21use futures::future::BoxFuture;
22use futures::{Future, FutureExt, TryFutureExt};
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::TableId;
25use risingwave_common::hash::VirtualNode;
26use risingwave_hummock_sdk::key::{TableKey, TableKeyRange};
27use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch, SyncResult};
28use thiserror_ext::AsReport;
29use tokio::time::Instant;
30use tracing::{Instrument, error};
31
32use super::{MonitoredStateStoreGetStats, MonitoredStateStoreIterStats, MonitoredStorageMetrics};
33use crate::error::StorageResult;
34use crate::hummock::sstable_store::SstableStoreRef;
35use crate::hummock::{HummockStorage, ObjectIdManagerRef};
36use crate::monitor::monitored_storage_metrics::StateStoreIterStats;
37use crate::monitor::{StateStoreIterLogStats, StateStoreIterStatsTrait};
38use crate::store::*;
39use crate::store_impl::AsHummock;
40
41/// A state store wrapper for monitoring metrics.
42#[derive(Clone)]
43pub struct MonitoredStateStore<S, E = ()> {
44    inner: Box<S>,
45    storage_metrics: Arc<MonitoredStorageMetrics>,
46    extra: E,
47}
48
49type MonitoredTableStateStore<S> = MonitoredStateStore<S, TableId>;
50
51impl<S> MonitoredStateStore<S> {
52    pub fn new(inner: S, storage_metrics: Arc<MonitoredStorageMetrics>) -> Self {
53        Self {
54            inner: Box::new(inner),
55            storage_metrics,
56            extra: (),
57        }
58    }
59
60    pub fn inner(&self) -> &S {
61        &self.inner
62    }
63}
64
65impl<S> MonitoredTableStateStore<S> {
66    fn new(inner: S, storage_metrics: Arc<MonitoredStorageMetrics>, table_id: TableId) -> Self {
67        Self {
68            inner: Box::new(inner),
69            storage_metrics,
70            extra: table_id,
71        }
72    }
73
74    fn table_id(&self) -> TableId {
75        self.extra
76    }
77}
78
79// Note: it is important to define the `MonitoredStateStoreIter` type alias, as it marks that
80// the return type of `monitored_iter` only captures the lifetime `'s` and has nothing to do with
81// `'a`. If we simply use `impl StateStoreIter + 's`, the rust compiler will also capture
82// the lifetime `'a` in the scope defined in the scope.
83impl<S, E> MonitoredStateStore<S, E> {
84    async fn monitored_iter<
85        'a,
86        Item: IterItem,
87        I: StateStoreIter<Item> + 'a,
88        Stat: StateStoreIterStatsTrait<Item = Item>,
89    >(
90        &'a self,
91        table_id: TableId,
92        iter_stream_future: impl Future<Output = StorageResult<I>> + 'a,
93    ) -> StorageResult<MonitoredStateStoreIter<Item, I, Stat>> {
94        // start time takes iterator build time into account
95        // wait for iterator creation (e.g. seek)
96        let start_time = Instant::now();
97        let iter_stream = iter_stream_future
98            .await
99            .inspect_err(|e| error!(error = %e.as_report(), "Failed in iter"))?;
100        let iter_init_duration = start_time.elapsed();
101
102        // create a monitored iterator to collect metrics
103        let monitored = MonitoredStateStoreIter {
104            inner: iter_stream,
105            stats: MonitoredStateStoreIterStats {
106                inner: Stat::new(table_id.table_id, &self.storage_metrics, iter_init_duration),
107                table_id: table_id.table_id,
108                metrics: self.storage_metrics.clone(),
109            },
110            _phantom: PhantomData,
111        };
112        Ok(monitored)
113    }
114
115    async fn monitored_on_key_value<O>(
116        &self,
117        on_key_value_future: impl Future<Output = StorageResult<Option<(O, usize)>>>,
118        table_id: TableId,
119        key_len: usize,
120    ) -> StorageResult<Option<O>> {
121        let mut stats =
122            MonitoredStateStoreGetStats::new(table_id.table_id, self.storage_metrics.clone());
123
124        let value = on_key_value_future
125            .instrument_await("store_on_key_value".verbose())
126            .instrument(tracing::trace_span!("store_on_key_value"))
127            .await
128            .inspect_err(|e| error!(error = %e.as_report(), "Failed in get"))?;
129
130        stats.get_key_size = key_len;
131        let value = value.map(|(value, value_len)| {
132            stats.get_value_size = value_len;
133            value
134        });
135        stats.report();
136
137        Ok(value)
138    }
139}
140
141impl<S: StateStoreGet> StateStoreGet for MonitoredTableStateStore<S> {
142    fn on_key_value<O: Send + 'static>(
143        &self,
144        key: TableKey<Bytes>,
145        read_options: ReadOptions,
146        on_key_value_fn: impl KeyValueFn<O>,
147    ) -> impl StorageFuture<'_, Option<O>> {
148        let table_id = self.table_id();
149        let key_len = key.len();
150        self.monitored_on_key_value(
151            self.inner
152                .on_key_value(key, read_options, move |key, value| {
153                    let result = on_key_value_fn(key, value);
154                    result.map(|output| (output, value.len()))
155                }),
156            table_id,
157            key_len,
158        )
159    }
160}
161
162impl<S: StateStoreRead> StateStoreRead for MonitoredTableStateStore<S> {
163    type Iter = impl StateStoreReadIter;
164    type RevIter = impl StateStoreReadIter;
165
166    fn iter(
167        &self,
168        key_range: TableKeyRange,
169        read_options: ReadOptions,
170    ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
171        self.monitored_iter::<'_, _, _, StateStoreIterStats>(
172            self.table_id(),
173            self.inner.iter(key_range, read_options),
174        )
175    }
176
177    fn rev_iter(
178        &self,
179        key_range: TableKeyRange,
180        read_options: ReadOptions,
181    ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
182        self.monitored_iter::<'_, _, _, StateStoreIterStats>(
183            self.table_id(),
184            self.inner.rev_iter(key_range, read_options),
185        )
186    }
187}
188
189impl<S: StateStoreReadLog> StateStoreReadLog for MonitoredStateStore<S> {
190    type ChangeLogIter = impl StateStoreReadChangeLogIter;
191
192    fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> impl StorageFuture<'_, u64> {
193        self.inner.next_epoch(epoch, options)
194    }
195
196    fn iter_log(
197        &self,
198        epoch_range: (u64, u64),
199        key_range: TableKeyRange,
200        options: ReadLogOptions,
201    ) -> impl Future<Output = StorageResult<Self::ChangeLogIter>> + Send + '_ {
202        self.monitored_iter::<'_, _, _, StateStoreIterLogStats>(
203            options.table_id,
204            self.inner.iter_log(epoch_range, key_range, options),
205        )
206    }
207}
208
209impl<S: StateStoreReadVector> StateStoreReadVector for MonitoredTableStateStore<S> {
210    fn nearest<O: Send + 'static>(
211        &self,
212        vec: Vector,
213        options: VectorNearestOptions,
214        on_nearest_item_fn: impl OnNearestItemFn<O>,
215    ) -> impl StorageFuture<'_, Vec<O>> {
216        // TODO: monitor
217        self.inner.nearest(vec, options, on_nearest_item_fn)
218    }
219}
220
221impl<S: LocalStateStore> LocalStateStore for MonitoredTableStateStore<S> {
222    type FlushedSnapshotReader = MonitoredTableStateStore<S::FlushedSnapshotReader>;
223
224    type Iter<'a> = impl StateStoreIter + 'a;
225    type RevIter<'a> = impl StateStoreIter + 'a;
226
227    fn iter(
228        &self,
229        key_range: TableKeyRange,
230        read_options: ReadOptions,
231    ) -> impl Future<Output = StorageResult<Self::Iter<'_>>> + Send + '_ {
232        self.monitored_iter::<'_, _, _, StateStoreIterStats>(
233            self.table_id(),
234            self.inner.iter(key_range, read_options),
235        )
236    }
237
238    fn rev_iter(
239        &self,
240        key_range: TableKeyRange,
241        read_options: ReadOptions,
242    ) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_ {
243        self.monitored_iter::<'_, _, _, StateStoreIterStats>(
244            self.table_id(),
245            self.inner.rev_iter(key_range, read_options),
246        )
247    }
248
249    fn insert(
250        &mut self,
251        key: TableKey<Bytes>,
252        new_val: Bytes,
253        old_val: Option<Bytes>,
254    ) -> StorageResult<()> {
255        // TODO: collect metrics
256        self.inner.insert(key, new_val, old_val)
257    }
258
259    fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
260        // TODO: collect metrics
261        self.inner.delete(key, old_val)
262    }
263
264    fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
265        self.inner.get_table_watermark(vnode)
266    }
267
268    fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
269        MonitoredTableStateStore::new(
270            self.inner.new_flushed_snapshot_reader(),
271            self.storage_metrics.clone(),
272            self.table_id(),
273        )
274    }
275
276    async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
277        self.inner.update_vnode_bitmap(vnodes).await
278    }
279}
280
281impl<S: StateStoreWriteEpochControl> StateStoreWriteEpochControl for MonitoredTableStateStore<S> {
282    fn flush(&mut self) -> impl Future<Output = StorageResult<usize>> + Send + '_ {
283        self.inner.flush().instrument_await("store_flush".verbose())
284    }
285
286    async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
287        self.inner.init(options).await
288    }
289
290    fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
291        // TODO: may collect metrics
292        self.inner.seal_current_epoch(next_epoch, opts)
293    }
294
295    fn try_flush(&mut self) -> impl Future<Output = StorageResult<()>> + Send + '_ {
296        self.inner
297            .try_flush()
298            .instrument_await("store_try_flush".verbose())
299    }
300}
301
302impl<S: StateStoreWriteVector> StateStoreWriteVector for MonitoredTableStateStore<S> {
303    fn insert(&mut self, vec: Vector, info: Bytes) -> StorageResult<()> {
304        // TODO: monitor
305        self.inner.insert(vec, info)
306    }
307}
308
309impl<S: StateStore> StateStore for MonitoredStateStore<S> {
310    type Local = MonitoredTableStateStore<S::Local>;
311    type ReadSnapshot = MonitoredTableStateStore<S::ReadSnapshot>;
312    type VectorWriter = MonitoredTableStateStore<S::VectorWriter>;
313
314    fn try_wait_epoch(
315        &self,
316        epoch: HummockReadEpoch,
317        options: TryWaitEpochOptions,
318    ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
319        self.inner
320            .try_wait_epoch(epoch, options)
321            .instrument_await("store_wait_epoch".verbose())
322            .inspect_err(|e| error!(error = %e.as_report(), "Failed in wait_epoch"))
323    }
324
325    fn monitored(
326        self,
327        _storage_metrics: Arc<MonitoredStorageMetrics>,
328    ) -> MonitoredStateStore<Self> {
329        panic!("the state store is already monitored")
330    }
331
332    async fn new_local(&self, option: NewLocalOptions) -> Self::Local {
333        let table_id = option.table_id;
334        MonitoredTableStateStore::new(
335            self.inner
336                .new_local(option)
337                .instrument_await("store_new_local")
338                .await,
339            self.storage_metrics.clone(),
340            table_id,
341        )
342    }
343
344    async fn new_read_snapshot(
345        &self,
346        epoch: HummockReadEpoch,
347        options: NewReadSnapshotOptions,
348    ) -> StorageResult<Self::ReadSnapshot> {
349        Ok(MonitoredTableStateStore::new(
350            self.inner.new_read_snapshot(epoch, options).await?,
351            self.storage_metrics.clone(),
352            options.table_id,
353        ))
354    }
355
356    async fn new_vector_writer(&self, options: NewVectorWriterOptions) -> Self::VectorWriter {
357        let table_id = options.table_id;
358        MonitoredTableStateStore::new(
359            self.inner.new_vector_writer(options).await,
360            self.storage_metrics.clone(),
361            table_id,
362        )
363    }
364}
365
366impl MonitoredStateStore<HummockStorage> {
367    pub fn sstable_store(&self) -> SstableStoreRef {
368        self.inner.sstable_store()
369    }
370
371    pub fn object_id_manager(&self) -> ObjectIdManagerRef {
372        self.inner.object_id_manager().clone()
373    }
374}
375
376impl<S: AsHummock> AsHummock for MonitoredStateStore<S> {
377    fn as_hummock(&self) -> Option<&HummockStorage> {
378        self.inner.as_hummock()
379    }
380
381    fn sync(
382        &self,
383        sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
384    ) -> BoxFuture<'_, StorageResult<SyncResult>> {
385        async move {
386            let future = self
387                .inner
388                .sync(sync_table_epochs)
389                .instrument_await("store_sync");
390            let timer = self.storage_metrics.sync_duration.start_timer();
391            let sync_size = self.storage_metrics.sync_size.clone();
392            let sync_result = future
393                .await
394                .inspect_err(|e| error!(error = %e.as_report(), "Failed in sync"))?;
395            timer.observe_duration();
396            if sync_result.sync_size != 0 {
397                sync_size.observe(sync_result.sync_size as _);
398            }
399            Ok(sync_result)
400        }
401        .boxed()
402    }
403}
404
405/// A state store iterator wrapper for monitoring metrics.
406pub(crate) struct MonitoredStateStoreIter<
407    Item: IterItem,
408    I,
409    S: StateStoreIterStatsTrait<Item = Item>,
410> {
411    inner: I,
412    stats: MonitoredStateStoreIterStats<S>,
413    _phantom: PhantomData<Item>,
414}
415
416impl<Item: IterItem, I: StateStoreIter<Item>, S: StateStoreIterStatsTrait<Item = Item>>
417    StateStoreIter<Item> for MonitoredStateStoreIter<Item, I, S>
418{
419    async fn try_next(&mut self) -> StorageResult<Option<Item::ItemRef<'_>>> {
420        if let Some(item) = self
421            .inner
422            .try_next()
423            .instrument(tracing::trace_span!("store_iter_try_next"))
424            .await
425            .inspect_err(|e| error!(error = %e.as_report(), "Failed in next"))?
426        {
427            self.stats.inner.observe(item);
428            Ok(Some(item))
429        } else {
430            Ok(None)
431        }
432    }
433}