risingwave_storage/store/
auto_rebuild.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Bound;
16use std::sync::Arc;
17
18use bytes::Bytes;
19use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange};
20
21use super::{ReadOptions, StateStoreIter, StateStoreKeyedRowRef, StateStoreRead};
22use crate::error::StorageResult;
23
24pub struct AutoRebuildStateStoreReadIter<S: StateStoreRead, F> {
25    state_store: Arc<S>,
26    iter: S::Iter,
27    // closure decides whether to rebuild the iterator; it should reset itself after returning true.
28    should_rebuild: F,
29    end_bound: Bound<TableKey<Bytes>>,
30    options: ReadOptions,
31}
32
33impl<S: StateStoreRead, F: FnMut() -> bool> AutoRebuildStateStoreReadIter<S, F> {
34    pub async fn new(
35        state_store: Arc<S>,
36        should_rebuild: F,
37        range: TableKeyRange,
38        options: ReadOptions,
39    ) -> StorageResult<Self> {
40        let (start_bound, end_bound) = range;
41        let iter = state_store
42            .iter((start_bound, end_bound.clone()), options.clone())
43            .await?;
44        Ok(Self {
45            state_store,
46            iter,
47            should_rebuild,
48            end_bound,
49            options,
50        })
51    }
52}
53
54impl<S: StateStoreRead, F: FnMut() -> bool + Send> StateStoreIter
55    for AutoRebuildStateStoreReadIter<S, F>
56{
57    async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
58        let should_rebuild = (self.should_rebuild)();
59        if should_rebuild {
60            let Some((key, _value)) = self.iter.try_next().await? else {
61                return Ok(None);
62            };
63            let key: FullKey<&[u8]> = key;
64            let range_start = Bytes::copy_from_slice(key.user_key.table_key.as_ref());
65            let new_iter = self
66                .state_store
67                .iter(
68                    (
69                        Bound::Included(TableKey(range_start.clone())),
70                        self.end_bound.clone(),
71                    ),
72                    self.options.clone(),
73                )
74                .await?;
75            self.iter = new_iter;
76            let item: Option<StateStoreKeyedRowRef<'_>> = self.iter.try_next().await?;
77            if let Some((key, value)) = item {
78                assert_eq!(
79                    key.user_key.table_key.0,
80                    range_start.as_ref(),
81                    "the first key should be the previous key"
82                );
83                Ok(Some((key, value)))
84            } else {
85                unreachable!(
86                    "the first key should be the previous key {:?}, but get None",
87                    range_start
88                )
89            }
90        } else {
91            self.iter.try_next().await
92        }
93    }
94}
95
96pub mod timeout_auto_rebuild {
97    use std::sync::Arc;
98    use std::time::{Duration, Instant};
99
100    use risingwave_common::catalog::TableId;
101    use risingwave_hummock_sdk::key::TableKeyRange;
102    use tracing::info;
103
104    use super::{AutoRebuildStateStoreReadIter, ReadOptions, StateStoreRead};
105    use crate::error::StorageResult;
106
107    pub type TimeoutAutoRebuildIter<S: StateStoreRead> =
108        AutoRebuildStateStoreReadIter<S, impl FnMut() -> bool + Send>;
109
110    #[define_opaque(TimeoutAutoRebuildIter)]
111    pub async fn iter_with_timeout_rebuild<S: StateStoreRead>(
112        state_store: Arc<S>,
113        range: TableKeyRange,
114        table_id: TableId,
115        options: ReadOptions,
116        timeout: Duration,
117    ) -> StorageResult<TimeoutAutoRebuildIter<S>> {
118        const CHECK_TIMEOUT_PERIOD: usize = 100;
119        // use a struct here to avoid accidental copy instead of move on primitive usize
120        struct Count(usize);
121        let mut check_count = Count(0);
122        let mut total_count = Count(0);
123        let mut curr_iter_item_count = Count(0);
124        let mut start_time = Instant::now();
125        let initial_start_time = start_time;
126        AutoRebuildStateStoreReadIter::new(
127            state_store,
128            move || {
129                check_count.0 += 1;
130                curr_iter_item_count.0 += 1;
131                total_count.0 += 1;
132                if check_count.0 == CHECK_TIMEOUT_PERIOD {
133                    check_count.0 = 0;
134                    if start_time.elapsed() > timeout {
135                        let prev_iter_item_count = curr_iter_item_count.0;
136                        curr_iter_item_count.0 = 0;
137                        start_time = Instant::now();
138                        info!(
139                            %table_id,
140                            iter_exist_time_secs = initial_start_time.elapsed().as_secs(),
141                            prev_iter_item_count,
142                            total_iter_item_count = total_count.0,
143                            "kv log store iter is rebuilt"
144                        );
145                        true
146                    } else {
147                        false
148                    }
149                } else {
150                    false
151                }
152            },
153            range,
154            options,
155        )
156        .await
157    }
158}