risingwave_storage/store/
auto_rebuild.rs1use std::ops::Bound;
16use std::sync::Arc;
17
18use bytes::Bytes;
19use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange};
20
21use super::{ReadOptions, StateStoreIter, StateStoreKeyedRowRef, StateStoreRead};
22use crate::error::StorageResult;
23
24pub struct AutoRebuildStateStoreReadIter<S: StateStoreRead, F> {
25 state_store: Arc<S>,
26 iter: S::Iter,
27 should_rebuild: F,
29 end_bound: Bound<TableKey<Bytes>>,
30 options: ReadOptions,
31}
32
33impl<S: StateStoreRead, F: FnMut() -> bool> AutoRebuildStateStoreReadIter<S, F> {
34 pub async fn new(
35 state_store: Arc<S>,
36 should_rebuild: F,
37 range: TableKeyRange,
38 options: ReadOptions,
39 ) -> StorageResult<Self> {
40 let (start_bound, end_bound) = range;
41 let iter = state_store
42 .iter((start_bound, end_bound.clone()), options.clone())
43 .await?;
44 Ok(Self {
45 state_store,
46 iter,
47 should_rebuild,
48 end_bound,
49 options,
50 })
51 }
52}
53
54impl<S: StateStoreRead, F: FnMut() -> bool + Send> StateStoreIter
55 for AutoRebuildStateStoreReadIter<S, F>
56{
57 async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
58 let should_rebuild = (self.should_rebuild)();
59 if should_rebuild {
60 let Some((key, _value)) = self.iter.try_next().await? else {
61 return Ok(None);
62 };
63 let key: FullKey<&[u8]> = key;
64 let range_start = Bytes::copy_from_slice(key.user_key.table_key.as_ref());
65 let new_iter = self
66 .state_store
67 .iter(
68 (
69 Bound::Included(TableKey(range_start.clone())),
70 self.end_bound.clone(),
71 ),
72 self.options.clone(),
73 )
74 .await?;
75 self.iter = new_iter;
76 let item: Option<StateStoreKeyedRowRef<'_>> = self.iter.try_next().await?;
77 if let Some((key, value)) = item {
78 assert_eq!(
79 key.user_key.table_key.0,
80 range_start.as_ref(),
81 "the first key should be the previous key"
82 );
83 Ok(Some((key, value)))
84 } else {
85 unreachable!(
86 "the first key should be the previous key {:?}, but get None",
87 range_start
88 )
89 }
90 } else {
91 self.iter.try_next().await
92 }
93 }
94}
95
96pub mod timeout_auto_rebuild {
97 use std::sync::Arc;
98 use std::time::{Duration, Instant};
99
100 use risingwave_common::catalog::TableId;
101 use risingwave_hummock_sdk::key::TableKeyRange;
102 use tracing::info;
103
104 use super::{AutoRebuildStateStoreReadIter, ReadOptions, StateStoreRead};
105 use crate::error::StorageResult;
106
107 pub type TimeoutAutoRebuildIter<S: StateStoreRead> =
108 AutoRebuildStateStoreReadIter<S, impl FnMut() -> bool + Send>;
109
110 #[define_opaque(TimeoutAutoRebuildIter)]
111 pub async fn iter_with_timeout_rebuild<S: StateStoreRead>(
112 state_store: Arc<S>,
113 range: TableKeyRange,
114 table_id: TableId,
115 options: ReadOptions,
116 timeout: Duration,
117 ) -> StorageResult<TimeoutAutoRebuildIter<S>> {
118 const CHECK_TIMEOUT_PERIOD: usize = 100;
119 struct Count(usize);
121 let mut check_count = Count(0);
122 let mut total_count = Count(0);
123 let mut curr_iter_item_count = Count(0);
124 let mut start_time = Instant::now();
125 let initial_start_time = start_time;
126 AutoRebuildStateStoreReadIter::new(
127 state_store,
128 move || {
129 check_count.0 += 1;
130 curr_iter_item_count.0 += 1;
131 total_count.0 += 1;
132 if check_count.0 == CHECK_TIMEOUT_PERIOD {
133 check_count.0 = 0;
134 if start_time.elapsed() > timeout {
135 let prev_iter_item_count = curr_iter_item_count.0;
136 curr_iter_item_count.0 = 0;
137 start_time = Instant::now();
138 info!(
139 %table_id,
140 iter_exist_time_secs = initial_start_time.elapsed().as_secs(),
141 prev_iter_item_count,
142 total_iter_item_count = total_count.0,
143 "kv log store iter is rebuilt"
144 );
145 true
146 } else {
147 false
148 }
149 } else {
150 false
151 }
152 },
153 range,
154 options,
155 )
156 .await
157 }
158}