risingwave_stream/common/table/
state_table_cache.rs1use risingwave_common::array::Op;
16use risingwave_common::row::{OwnedRow, Row, RowExt};
17use risingwave_common::types::{DefaultOrdered, ScalarRefImpl};
18use risingwave_common_estimate_size::EstimateSize;
19
20use crate::common::state_cache::{StateCache, TopNStateCache};
21
22type WatermarkCacheKey = DefaultOrdered<OwnedRow>;
32
33#[derive(EstimateSize, Clone)]
81pub struct StateTableWatermarkCache {
82 inner: TopNStateCache<WatermarkCacheKey, ()>,
83}
84
85impl StateTableWatermarkCache {
86 pub fn new(size: usize) -> Self {
87 Self {
88 inner: TopNStateCache::new(size),
89 }
90 }
91
92 #[allow(dead_code)]
96 fn new_with_row_count(size: usize, row_count: usize) -> Self {
97 Self {
98 inner: TopNStateCache::with_table_row_count(size, row_count),
99 }
100 }
101
102 fn first_key(&self) -> Option<&WatermarkCacheKey> {
104 self.inner.first_key_value().map(|(k, _)| k)
105 }
106
107 pub fn lowest_key(&self) -> Option<ScalarRefImpl<'_>> {
109 self.first_key().and_then(|k| k.0.datum_at(0))
110 }
111
112 pub fn insert(&mut self, key: &impl Row) {
114 if !key.is_null_at(0) {
115 self.inner.insert(DefaultOrdered(key.into_owned_row()), ());
116 }
117 }
118
119 pub fn delete(&mut self, key: &impl Row) {
122 if !key.is_null_at(0) {
123 self.inner.delete(&DefaultOrdered(key.into_owned_row()));
124 }
125 }
126
127 pub fn capacity(&self) -> usize {
128 self.inner.capacity()
129 }
130
131 pub fn len(&self) -> usize {
132 self.inner.len()
133 }
134
135 pub fn set_table_row_count(&mut self, table_row_count: usize) {
136 self.inner.set_table_row_count(table_row_count)
137 }
138
139 #[cfg(test)]
140 pub fn get_table_row_count(&self) -> &Option<usize> {
141 self.inner.get_table_row_count()
142 }
143}
144
145impl StateCache for StateTableWatermarkCache {
146 type Filler<'a> = &'a mut TopNStateCache<WatermarkCacheKey, ()>;
147 type Key = WatermarkCacheKey;
148 type Value = ();
149
150 fn is_synced(&self) -> bool {
151 self.inner.is_synced()
152 }
153
154 fn begin_syncing(&mut self) -> Self::Filler<'_> {
155 self.inner.begin_syncing()
156 }
157
158 fn insert(&mut self, key: Self::Key, value: Self::Value) -> Option<Self::Value> {
159 self.inner.insert(key, value)
160 }
161
162 fn delete(&mut self, key: &Self::Key) -> Option<Self::Value> {
163 self.inner.delete(key)
164 }
165
166 fn apply_batch(&mut self, batch: impl IntoIterator<Item = (Op, Self::Key, Self::Value)>) {
167 self.inner.apply_batch(batch)
168 }
169
170 fn clear(&mut self) {
171 self.inner.clear()
172 }
173
174 fn values(&self) -> impl Iterator<Item = &Self::Value> {
175 self.inner.values()
176 }
177
178 fn first_key_value(&self) -> Option<(&Self::Key, &Self::Value)> {
179 self.inner.first_key_value()
180 }
181}
182
183#[cfg(test)]
184mod tests {
185
186 use risingwave_common::types::{Scalar, Timestamptz};
187
188 use super::*;
189 use crate::common::state_cache::StateCacheFiller;
190
191 #[test]
203 fn test_state_table_watermark_cache_inserts() {
204 let v1 = [
205 Some(Timestamptz::from_secs(1000).unwrap().to_scalar_value()),
206 Some(Timestamptz::from_secs(1234).unwrap().to_scalar_value()),
207 ];
208 let v2 = [
209 Some(Timestamptz::from_secs(999).unwrap().to_scalar_value()),
210 Some(Timestamptz::from_secs(1234).unwrap().to_scalar_value()),
211 ];
212 let v3 = [
213 Some(Timestamptz::from_secs(2000).unwrap().to_scalar_value()),
214 Some(Timestamptz::from_secs(1234).unwrap().to_scalar_value()),
215 ];
216 let v4 = [
217 Some(Timestamptz::from_secs(900).unwrap().to_scalar_value()),
218 Some(Timestamptz::from_secs(1234).unwrap().to_scalar_value()),
219 ];
220 let v5 = [
221 Some(Timestamptz::from_secs(800).unwrap().to_scalar_value()),
222 Some(Timestamptz::from_secs(1234).unwrap().to_scalar_value()),
223 ];
224 let mut cache = StateTableWatermarkCache::new(3);
225 assert_eq!(cache.capacity(), 3);
226 let filler = cache.begin_syncing();
227 filler.finish();
228
229 cache.insert(&v1);
231 assert_eq!(cache.len(), 1);
232 assert_eq!(
233 cache.lowest_key(),
234 Some(v1[0].as_ref().unwrap().as_scalar_ref_impl())
235 );
236
237 cache.insert(&v2);
239 assert_eq!(cache.len(), 2);
240 assert_eq!(
241 cache.lowest_key(),
242 Some(v2[0].as_ref().unwrap().as_scalar_ref_impl())
243 );
244
245 cache.insert(&v3);
247 assert_eq!(cache.len(), 2);
248 assert_eq!(
249 cache.lowest_key(),
250 Some(v2[0].as_ref().unwrap().as_scalar_ref_impl())
251 );
252
253 cache.insert(&v4);
255 assert_eq!(cache.len(), 3);
256 assert_eq!(
257 cache.lowest_key(),
258 Some(v4[0].as_ref().unwrap().as_scalar_ref_impl())
259 );
260
261 cache.insert(&v5);
263 assert_eq!(cache.len(), 3);
264 assert_eq!(
265 cache.lowest_key(),
266 Some(v5[0].as_ref().unwrap().as_scalar_ref_impl())
267 );
268 }
269
270 #[test]
271 fn test_state_table_watermark_cache_delete_non_existent_value() {
272 let mut cache = StateTableWatermarkCache::new(3);
273 assert_eq!(cache.capacity(), 3);
274 let filler = cache.begin_syncing();
275 filler.finish();
276 let v1 = [
277 Some(Timestamptz::from_secs(1000).unwrap().to_scalar_value()),
278 Some(Timestamptz::from_secs(1234).unwrap().to_scalar_value()),
279 ];
280 cache.delete(&v1);
281 }
282
283 #[test]
291 fn test_state_table_watermark_cache_with_row_count_inserts() {
292 let mut cache = StateTableWatermarkCache::new_with_row_count(3, 0);
293 assert_eq!(cache.capacity(), 3);
294 let filler = cache.begin_syncing();
295 filler.finish();
296 assert!(cache.first_key_value().is_none());
297 assert!(cache.lowest_key().is_none());
298
299 let v1 = [
300 Some(Timestamptz::from_secs(1000).unwrap().to_scalar_value()),
301 Some(Timestamptz::from_secs(1000).unwrap().to_scalar_value()),
302 ];
303 cache.insert(&v1);
304 let lowest = cache.lowest_key().unwrap();
305 assert_eq!(lowest, v1[0].clone().unwrap().as_scalar_ref_impl());
306
307 let v2 = [
308 Some(Timestamptz::from_secs(999).unwrap().to_scalar_value()),
309 Some(Timestamptz::from_secs(1234).unwrap().to_scalar_value()),
310 ];
311 cache.insert(&v2);
312 assert_eq!(cache.len(), 2);
313 let lowest = cache.lowest_key().unwrap();
314 assert_eq!(lowest, v2[0].clone().unwrap().as_scalar_ref_impl());
315
316 let v3 = [
317 Some(Timestamptz::from_secs(2000).unwrap().to_scalar_value()),
318 Some(Timestamptz::from_secs(1234).unwrap().to_scalar_value()),
319 ];
320 cache.insert(&v3);
321 assert_eq!(cache.len(), 3);
322
323 let v4 = [
324 Some(Timestamptz::from_secs(3000).unwrap().to_scalar_value()),
325 Some(Timestamptz::from_secs(1234).unwrap().to_scalar_value()),
326 ];
327 cache.insert(&v4);
328 assert_eq!(cache.len(), 3);
329
330 let v5 = [
331 Some(Timestamptz::from_secs(900).unwrap().to_scalar_value()),
332 Some(Timestamptz::from_secs(1234).unwrap().to_scalar_value()),
333 ];
334 cache.insert(&v5);
335 assert_eq!(cache.len(), 3);
336 let lowest = cache.lowest_key().unwrap();
337 assert_eq!(lowest, v5[0].clone().unwrap().as_scalar_ref_impl());
338 }
339
340 #[test]
378 fn test_state_table_watermark_cache_with_row_count_deletes() {
379 let mut cache = StateTableWatermarkCache::new_with_row_count(3, 0);
381 assert_eq!(cache.capacity(), 3);
382 let filler = cache.begin_syncing();
383 filler.finish();
384 let v1 = [
385 Some(Timestamptz::from_secs(1000).unwrap().to_scalar_value()),
386 Some(Timestamptz::from_secs(1234).unwrap().to_scalar_value()),
387 ];
388 let v2 = [
389 Some(Timestamptz::from_secs(999).unwrap().to_scalar_value()),
390 Some(Timestamptz::from_secs(1234).unwrap().to_scalar_value()),
391 ];
392 let v3 = [
393 Some(Timestamptz::from_secs(2000).unwrap().to_scalar_value()),
394 Some(Timestamptz::from_secs(1234).unwrap().to_scalar_value()),
395 ];
396 let v4 = [
397 Some(Timestamptz::from_secs(3000).unwrap().to_scalar_value()),
398 Some(Timestamptz::from_secs(1234).unwrap().to_scalar_value()),
399 ];
400 let v5 = [
401 Some(Timestamptz::from_secs(900).unwrap().to_scalar_value()),
402 Some(Timestamptz::from_secs(1234).unwrap().to_scalar_value()),
403 ];
404 cache.insert(&v1);
405 cache.insert(&v2);
406 cache.insert(&v3);
407 cache.insert(&v4);
408 cache.insert(&v5);
409
410 cache.delete(&v2);
412 assert_eq!(cache.len(), 2);
413 let lowest = cache.lowest_key().unwrap();
414 assert_eq!(lowest, v5[0].clone().unwrap().as_scalar_ref_impl());
415
416 let v6 = [
418 Some(Timestamptz::from_secs(1001).unwrap().to_scalar_value()),
419 Some(Timestamptz::from_secs(1234).unwrap().to_scalar_value()),
420 ];
421 cache.insert(&v6);
422 assert_eq!(cache.len(), 2);
423
424 let v7 = [
426 Some(Timestamptz::from_secs(950).unwrap().to_scalar_value()),
427 Some(Timestamptz::from_secs(1234).unwrap().to_scalar_value()),
428 ];
429 cache.insert(&v7);
430 assert_eq!(cache.len(), 3);
431
432 cache.delete(&v7);
434 assert_eq!(cache.len(), 2);
435
436 cache.delete(&v5);
438 assert_eq!(cache.len(), 1);
439 assert_eq!(
440 cache.lowest_key().unwrap(),
441 v1[0].clone().unwrap().as_scalar_ref_impl()
442 );
443
444 cache.delete(&v1);
446 assert_eq!(cache.len(), 0);
447 assert!(!cache.is_synced());
448
449 cache.insert(&v1);
451 assert_eq!(cache.len(), 0);
452 }
453
454 #[test]
455 fn test_watermark_cache_syncing() {
456 let v1 = [
457 Some(Timestamptz::from_secs(1000).unwrap().to_scalar_value()),
458 Some(1000i64.into()),
459 ];
460 let v2 = [
461 Some(Timestamptz::from_secs(3000).unwrap().to_scalar_value()),
462 Some(1000i64.into()),
463 ];
464 let v3 = [
465 Some(Timestamptz::from_secs(2000).unwrap().to_scalar_value()),
466 Some(1000i64.into()),
467 ];
468 let mut cache = StateTableWatermarkCache::new(3);
469 let mut filler = cache.begin_syncing();
470 filler.insert_unchecked(DefaultOrdered(v1.to_owned_row()), ());
471 filler.insert_unchecked(DefaultOrdered(v2.to_owned_row()), ());
472 filler.insert_unchecked(DefaultOrdered(v3.to_owned_row()), ());
473 filler.finish();
474 assert_eq!(cache.len(), 3);
475 assert_eq!(
476 cache.lowest_key().unwrap(),
477 v1[0].clone().unwrap().as_scalar_ref_impl()
478 );
479 }
480}