risingwave_stream/common/table/
state_table_cache.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::array::Op;
16use risingwave_common::row::{OwnedRow, Row, RowExt};
17use risingwave_common::types::{DefaultOrdered, ScalarRefImpl};
18use risingwave_common_estimate_size::EstimateSize;
19
20use crate::common::state_cache::{StateCache, TopNStateCache};
21
22/// The watermark cache key is just an `OwnedRow` wrapped in `DefaultOrdered`.
23/// This is because we want to use the `DefaultOrdered` implementation of `Ord`.
24/// The assumption is that the watermark column is the first column in the row.
25/// So it should automatically be ordered by the watermark column.
26/// We disregard the ordering of the remaining PK datums.
27///
28/// TODO(kwannoel):
29/// We can also store bytes from encoded pk.
30/// The benefit is we deserialization of PK when refiling the cache.
31type WatermarkCacheKey = DefaultOrdered<OwnedRow>;
32
33/// Cache Invariants
34/// -----------------
35/// - If cache is synced, it will ALWAYS contain TopN entries. NOTE: TopN here refers to the lowest
36///   N entries, where the first is the minimum and so on.
37/// - Cache will not contain NULL values in the watermark column, they are ignored in watermark
38///   state cleaning. They will not be included in the cache's table row count either, since they
39///   are treated as invisible.
40///
41/// Updates to Cache
42/// -----------------
43/// INSERT
44///    A. Cache evicted. Update cached value.
45///    B. Cache uninitialized. Initialize cache, insert into `TopNCache`.
46///    C. Cache not empty. Insert into `TopNCache`.
47///
48/// DELETE
49///    A. Matches lowest value pk. Remove lowest value. Mark cache as Evicted.
50///       Later on Barrier we will refresh the cache with table scan.
51///       Since on barrier we will clean up all values before watermark,
52///       We have less rows to scan.
53///    B. Does not match. Do nothing.
54///
55/// UPDATE
56///    Nothing. Watermark is part of pk. Pk won't change.
57///
58/// BARRIER
59///    State table commit. See below.
60///
61/// STATE TABLE COMMIT
62///    A. Decide whether to do state cleaning:
63///        if `watermark_to_be_cleaned` < smallest val
64///           OR no value in cache + cache is synced:
65///        No need issue delete range.
66///        if `watermark_to_be_cleaned` => smallest val OR cache not synced:
67///        Issue delete ranges.
68///
69///    B. Refreshing the cache:
70///        On barrier, do table scan from `most_recently_cleaned_watermark` (inclusive) to +inf.
71///        Take the Top N rows and insert into cache.
72///        This has to be implemented in `state_table`.
73///        We do not need to store any values, just the keys.
74///
75/// TODO(kwannoel):
76/// Optimization: If cache is not full,
77/// we can also do point delete for each cache entry.
78/// Not sure if this is more optimal,
79/// we have to measure this scenario to see if it is indeed better.
80#[derive(EstimateSize, Clone)]
81pub struct StateTableWatermarkCache {
82    inner: TopNStateCache<WatermarkCacheKey, ()>,
83}
84
85impl StateTableWatermarkCache {
86    pub fn new(size: usize) -> Self {
87        Self {
88            inner: TopNStateCache::new(size),
89        }
90    }
91
92    /// NOTE(kwannoel): Unused. Requires row count from State Table.
93    /// On first initialization, we can use `row_count` 0.
94    /// But if state table is reconstructed after recovery, we need to obtain row count meta-data.
95    #[allow(dead_code)]
96    fn new_with_row_count(size: usize, row_count: usize) -> Self {
97        Self {
98            inner: TopNStateCache::with_table_row_count(size, row_count),
99        }
100    }
101
102    /// Get the lowest key.
103    fn first_key(&self) -> Option<&WatermarkCacheKey> {
104        self.inner.first_key_value().map(|(k, _)| k)
105    }
106
107    // Get the watermark value from the top key.
108    pub fn lowest_key(&self) -> Option<ScalarRefImpl<'_>> {
109        self.first_key().and_then(|k| k.0.datum_at(0))
110    }
111
112    /// Insert a new value.
113    pub fn insert(&mut self, key: &impl Row) {
114        if !key.is_null_at(0) {
115            self.inner.insert(DefaultOrdered(key.into_owned_row()), ());
116        }
117    }
118
119    /// Delete a value
120    /// If the watermark col is NULL, it will just be ignored.
121    pub fn delete(&mut self, key: &impl Row) {
122        if !key.is_null_at(0) {
123            self.inner.delete(&DefaultOrdered(key.into_owned_row()));
124        }
125    }
126
127    pub fn capacity(&self) -> usize {
128        self.inner.capacity()
129    }
130
131    pub fn len(&self) -> usize {
132        self.inner.len()
133    }
134
135    pub fn set_table_row_count(&mut self, table_row_count: usize) {
136        self.inner.set_table_row_count(table_row_count)
137    }
138
139    #[cfg(test)]
140    pub fn get_table_row_count(&self) -> &Option<usize> {
141        self.inner.get_table_row_count()
142    }
143}
144
145impl StateCache for StateTableWatermarkCache {
146    type Filler<'a> = &'a mut TopNStateCache<WatermarkCacheKey, ()>;
147    type Key = WatermarkCacheKey;
148    type Value = ();
149
150    fn is_synced(&self) -> bool {
151        self.inner.is_synced()
152    }
153
154    fn begin_syncing(&mut self) -> Self::Filler<'_> {
155        self.inner.begin_syncing()
156    }
157
158    fn insert(&mut self, key: Self::Key, value: Self::Value) -> Option<Self::Value> {
159        self.inner.insert(key, value)
160    }
161
162    fn delete(&mut self, key: &Self::Key) -> Option<Self::Value> {
163        self.inner.delete(key)
164    }
165
166    fn apply_batch(&mut self, batch: impl IntoIterator<Item = (Op, Self::Key, Self::Value)>) {
167        self.inner.apply_batch(batch)
168    }
169
170    fn clear(&mut self) {
171        self.inner.clear()
172    }
173
174    fn values(&self) -> impl Iterator<Item = &Self::Value> {
175        self.inner.values()
176    }
177
178    fn first_key_value(&self) -> Option<(&Self::Key, &Self::Value)> {
179        self.inner.first_key_value()
180    }
181}
182
183#[cfg(test)]
184mod tests {
185
186    use risingwave_common::types::{Scalar, Timestamptz};
187
188    use super::*;
189    use crate::common::state_cache::StateCacheFiller;
190
191    /// With capacity 3, test the following sequence of inserts:
192    /// Insert
193    /// [SYNC] an empty table first.
194    /// [1000] should insert, cache is empty.
195    /// [999] should insert, smaller than 1000, should be lowest value.
196    /// [2000] should NOT insert.
197    /// It is larger than 1000, and we don't know the state table size,
198    /// for instance if state table just recovered, and does not have row count meta data.
199    /// This means there could be a row like [1001] in the state table.
200    /// [900, ...], should insert.
201    /// [800], should insert, smaller than 900, should be lowest value.
202    #[test]
203    fn test_state_table_watermark_cache_inserts() {
204        let v1 = [
205            Some(Timestamptz::from_secs(1000).unwrap().to_scalar_value()),
206            Some(Timestamptz::from_secs(1234).unwrap().to_scalar_value()),
207        ];
208        let v2 = [
209            Some(Timestamptz::from_secs(999).unwrap().to_scalar_value()),
210            Some(Timestamptz::from_secs(1234).unwrap().to_scalar_value()),
211        ];
212        let v3 = [
213            Some(Timestamptz::from_secs(2000).unwrap().to_scalar_value()),
214            Some(Timestamptz::from_secs(1234).unwrap().to_scalar_value()),
215        ];
216        let v4 = [
217            Some(Timestamptz::from_secs(900).unwrap().to_scalar_value()),
218            Some(Timestamptz::from_secs(1234).unwrap().to_scalar_value()),
219        ];
220        let v5 = [
221            Some(Timestamptz::from_secs(800).unwrap().to_scalar_value()),
222            Some(Timestamptz::from_secs(1234).unwrap().to_scalar_value()),
223        ];
224        let mut cache = StateTableWatermarkCache::new(3);
225        assert_eq!(cache.capacity(), 3);
226        let filler = cache.begin_syncing();
227        filler.finish();
228
229        // Test 1000
230        cache.insert(&v1);
231        assert_eq!(cache.len(), 1);
232        assert_eq!(
233            cache.lowest_key(),
234            Some(v1[0].as_ref().unwrap().as_scalar_ref_impl())
235        );
236
237        // Test 999
238        cache.insert(&v2);
239        assert_eq!(cache.len(), 2);
240        assert_eq!(
241            cache.lowest_key(),
242            Some(v2[0].as_ref().unwrap().as_scalar_ref_impl())
243        );
244
245        // Test 2000
246        cache.insert(&v3);
247        assert_eq!(cache.len(), 2);
248        assert_eq!(
249            cache.lowest_key(),
250            Some(v2[0].as_ref().unwrap().as_scalar_ref_impl())
251        );
252
253        // Test 900
254        cache.insert(&v4);
255        assert_eq!(cache.len(), 3);
256        assert_eq!(
257            cache.lowest_key(),
258            Some(v4[0].as_ref().unwrap().as_scalar_ref_impl())
259        );
260
261        // Test 800
262        cache.insert(&v5);
263        assert_eq!(cache.len(), 3);
264        assert_eq!(
265            cache.lowest_key(),
266            Some(v5[0].as_ref().unwrap().as_scalar_ref_impl())
267        );
268    }
269
270    #[test]
271    fn test_state_table_watermark_cache_delete_non_existent_value() {
272        let mut cache = StateTableWatermarkCache::new(3);
273        assert_eq!(cache.capacity(), 3);
274        let filler = cache.begin_syncing();
275        filler.finish();
276        let v1 = [
277            Some(Timestamptz::from_secs(1000).unwrap().to_scalar_value()),
278            Some(Timestamptz::from_secs(1234).unwrap().to_scalar_value()),
279        ];
280        cache.delete(&v1);
281    }
282
283    /// With capacity 3, test the following sequence of inserts:
284    /// Insert
285    /// [1000, ...], should insert, cache is empty.
286    /// [999, ...], should insert, smaller than 1000, should be lowest value.
287    /// [2000, ...], should insert, although larger than largest val (1000), cache rows still match
288    /// state table rows. [3000, ...], should be ignored
289    /// [900, ...], should evict 2000
290    #[test]
291    fn test_state_table_watermark_cache_with_row_count_inserts() {
292        let mut cache = StateTableWatermarkCache::new_with_row_count(3, 0);
293        assert_eq!(cache.capacity(), 3);
294        let filler = cache.begin_syncing();
295        filler.finish();
296        assert!(cache.first_key_value().is_none());
297        assert!(cache.lowest_key().is_none());
298
299        let v1 = [
300            Some(Timestamptz::from_secs(1000).unwrap().to_scalar_value()),
301            Some(Timestamptz::from_secs(1000).unwrap().to_scalar_value()),
302        ];
303        cache.insert(&v1);
304        let lowest = cache.lowest_key().unwrap();
305        assert_eq!(lowest, v1[0].clone().unwrap().as_scalar_ref_impl());
306
307        let v2 = [
308            Some(Timestamptz::from_secs(999).unwrap().to_scalar_value()),
309            Some(Timestamptz::from_secs(1234).unwrap().to_scalar_value()),
310        ];
311        cache.insert(&v2);
312        assert_eq!(cache.len(), 2);
313        let lowest = cache.lowest_key().unwrap();
314        assert_eq!(lowest, v2[0].clone().unwrap().as_scalar_ref_impl());
315
316        let v3 = [
317            Some(Timestamptz::from_secs(2000).unwrap().to_scalar_value()),
318            Some(Timestamptz::from_secs(1234).unwrap().to_scalar_value()),
319        ];
320        cache.insert(&v3);
321        assert_eq!(cache.len(), 3);
322
323        let v4 = [
324            Some(Timestamptz::from_secs(3000).unwrap().to_scalar_value()),
325            Some(Timestamptz::from_secs(1234).unwrap().to_scalar_value()),
326        ];
327        cache.insert(&v4);
328        assert_eq!(cache.len(), 3);
329
330        let v5 = [
331            Some(Timestamptz::from_secs(900).unwrap().to_scalar_value()),
332            Some(Timestamptz::from_secs(1234).unwrap().to_scalar_value()),
333        ];
334        cache.insert(&v5);
335        assert_eq!(cache.len(), 3);
336        let lowest = cache.lowest_key().unwrap();
337        assert_eq!(lowest, v5[0].clone().unwrap().as_scalar_ref_impl());
338    }
339
340    /// With capacity 3, seed the following sequence of inserts:
341    /// Insert:
342    /// [1000, ...]
343    /// [999, ...]
344    /// [2000, ...]
345    /// [3000, ...]
346    /// [900, ...]
347    ///
348    /// In the cache there should be:
349    /// [900, 999, 1000]
350    /// Then run one DELETE.
351    /// [999].
352    ///
353    /// The cache should be:
354    /// [900, 1000].
355    /// Lowest val: 900.
356    ///
357    /// Then run one INSERT.
358    /// [1001].
359    /// This should be ignored. It is larger than the largest val in the cache (1000).
360    /// And cache no longer matches state table rows.
361    ///
362    /// Then run another INSERT.
363    /// [950].
364    /// This should be accepted. It is smaller than the largest val in the cache (1000).
365    ///
366    /// Then run DELETE.
367    /// [900].
368    /// Lowest val: 1000.
369    ///
370    /// Then run DELETE.
371    /// [1000].
372    /// Lowest val: None.
373    ///
374    /// Then run INSERT.
375    /// Cache should be out of sync, should reject the insert.
376    /// Cache len = 0.
377    #[test]
378    fn test_state_table_watermark_cache_with_row_count_deletes() {
379        // Initial INSERT
380        let mut cache = StateTableWatermarkCache::new_with_row_count(3, 0);
381        assert_eq!(cache.capacity(), 3);
382        let filler = cache.begin_syncing();
383        filler.finish();
384        let v1 = [
385            Some(Timestamptz::from_secs(1000).unwrap().to_scalar_value()),
386            Some(Timestamptz::from_secs(1234).unwrap().to_scalar_value()),
387        ];
388        let v2 = [
389            Some(Timestamptz::from_secs(999).unwrap().to_scalar_value()),
390            Some(Timestamptz::from_secs(1234).unwrap().to_scalar_value()),
391        ];
392        let v3 = [
393            Some(Timestamptz::from_secs(2000).unwrap().to_scalar_value()),
394            Some(Timestamptz::from_secs(1234).unwrap().to_scalar_value()),
395        ];
396        let v4 = [
397            Some(Timestamptz::from_secs(3000).unwrap().to_scalar_value()),
398            Some(Timestamptz::from_secs(1234).unwrap().to_scalar_value()),
399        ];
400        let v5 = [
401            Some(Timestamptz::from_secs(900).unwrap().to_scalar_value()),
402            Some(Timestamptz::from_secs(1234).unwrap().to_scalar_value()),
403        ];
404        cache.insert(&v1);
405        cache.insert(&v2);
406        cache.insert(&v3);
407        cache.insert(&v4);
408        cache.insert(&v5);
409
410        // First Delete
411        cache.delete(&v2);
412        assert_eq!(cache.len(), 2);
413        let lowest = cache.lowest_key().unwrap();
414        assert_eq!(lowest, v5[0].clone().unwrap().as_scalar_ref_impl());
415
416        // Insert 1001
417        let v6 = [
418            Some(Timestamptz::from_secs(1001).unwrap().to_scalar_value()),
419            Some(Timestamptz::from_secs(1234).unwrap().to_scalar_value()),
420        ];
421        cache.insert(&v6);
422        assert_eq!(cache.len(), 2);
423
424        // Insert 950
425        let v7 = [
426            Some(Timestamptz::from_secs(950).unwrap().to_scalar_value()),
427            Some(Timestamptz::from_secs(1234).unwrap().to_scalar_value()),
428        ];
429        cache.insert(&v7);
430        assert_eq!(cache.len(), 3);
431
432        // Delete 950
433        cache.delete(&v7);
434        assert_eq!(cache.len(), 2);
435
436        // DELETE
437        cache.delete(&v5);
438        assert_eq!(cache.len(), 1);
439        assert_eq!(
440            cache.lowest_key().unwrap(),
441            v1[0].clone().unwrap().as_scalar_ref_impl()
442        );
443
444        // DELETE
445        cache.delete(&v1);
446        assert_eq!(cache.len(), 0);
447        assert!(!cache.is_synced());
448
449        // INSERT after Out of sync
450        cache.insert(&v1);
451        assert_eq!(cache.len(), 0);
452    }
453
454    #[test]
455    fn test_watermark_cache_syncing() {
456        let v1 = [
457            Some(Timestamptz::from_secs(1000).unwrap().to_scalar_value()),
458            Some(1000i64.into()),
459        ];
460        let v2 = [
461            Some(Timestamptz::from_secs(3000).unwrap().to_scalar_value()),
462            Some(1000i64.into()),
463        ];
464        let v3 = [
465            Some(Timestamptz::from_secs(2000).unwrap().to_scalar_value()),
466            Some(1000i64.into()),
467        ];
468        let mut cache = StateTableWatermarkCache::new(3);
469        let mut filler = cache.begin_syncing();
470        filler.insert_unchecked(DefaultOrdered(v1.to_owned_row()), ());
471        filler.insert_unchecked(DefaultOrdered(v2.to_owned_row()), ());
472        filler.insert_unchecked(DefaultOrdered(v3.to_owned_row()), ());
473        filler.finish();
474        assert_eq!(cache.len(), 3);
475        assert_eq!(
476            cache.lowest_key().unwrap(),
477            v1[0].clone().unwrap().as_scalar_ref_impl()
478        );
479    }
480}