risingwave_stream/executor/lookup/
cache.rs1use risingwave_common::array::Op;
16use risingwave_common::row::RowExt;
17use risingwave_common_estimate_size::collections::{EstimatedHashSet, EstimatedVec};
18
19use crate::cache::ManagedLruCache;
20use crate::common::metrics::MetricsInfo;
21use crate::consistency::consistency_panic;
22use crate::executor::prelude::*;
23
24pub type LookupEntryState = EstimatedHashSet<OwnedRow>;
25
26pub struct LookupCache {
28 data: ManagedLruCache<OwnedRow, LookupEntryState>,
29}
30
31impl LookupCache {
32 pub fn lookup(&mut self, key: &OwnedRow) -> Option<&LookupEntryState> {
34 self.data.get(key)
35 }
36
37 pub fn batch_update(&mut self, key: OwnedRow, value: EstimatedVec<OwnedRow>) {
39 self.data.push(key, LookupEntryState::from_vec(value));
40 }
41
42 pub fn apply_batch(&mut self, chunk: StreamChunk, arrange_join_keys: &[usize]) {
44 for (op, row) in chunk.rows() {
45 let key = row.project(arrange_join_keys).into_owned_row();
46 if let Some(mut values) = self.data.get_mut(&key) {
47 let row = row.into_owned_row();
49 match op {
50 Op::Insert | Op::UpdateInsert => {
51 if !values.insert(row) {
52 consistency_panic!("inserting a duplicated value");
53 }
54 }
55 Op::Delete | Op::UpdateDelete => {
56 if !values.remove(&row) {
57 consistency_panic!("row {:?} should be in the cache", row);
58 }
59 }
60 }
61 }
62 }
63 }
64
65 pub fn evict(&mut self) {
66 self.data.evict()
67 }
68
69 pub fn len(&self) -> usize {
70 self.data.len()
71 }
72
73 pub fn clear(&mut self) {
75 self.data.clear();
76 }
77
78 pub fn new(watermark_sequence: AtomicU64Ref, metrics_info: MetricsInfo) -> Self {
79 let cache = ManagedLruCache::unbounded(watermark_sequence, metrics_info);
80 Self { data: cache }
81 }
82}