1use std::collections::BTreeMap;
22use std::ops::Bound;
23use std::sync::Arc;
24
25use anyhow::Context;
26use futures::StreamExt;
27use risingwave_common::bitmap::Bitmap;
28use risingwave_common::catalog::TableId;
29use risingwave_common::row::{CompactedRow, OwnedRow, Row, RowExt};
30use risingwave_common::types::{DataType, ScalarImpl};
31use risingwave_common::util::epoch::EpochPair;
32use risingwave_common::util::row_serde::OrderedRowSerde;
33use risingwave_common::util::sort_util::OrderType;
34use risingwave_common_estimate_size::{EstimateSize, KvSize};
35use risingwave_storage::StateStore;
36use risingwave_storage::store::PrefetchOptions;
37
38use super::hash_join::{JoinEntryError, JoinHashMapMetrics, TableInner};
39use super::join_row_set::JoinRowSet;
40use crate::cache::ManagedLruCache;
41use crate::common::metrics::MetricsInfo;
42use crate::common::table::state_table::{RowStream, StateTable, StateTablePostCommit};
43use crate::consistency::{consistency_error, enable_strict_consistency};
44use crate::executor::error::StreamExecutorResult;
45use crate::executor::monitor::StreamingMetrics;
46use crate::task::{ActorId, AtomicU64Ref, FragmentId};
47
48type InequalKeyBytes = Vec<u8>;
50type PkSuffixBytes = Vec<u8>;
51
52pub trait AsOfRowEncoding: 'static + Send + Sync {
55 type Encoded: EstimateSize + Clone + Send + Sync;
56 fn encode(row: &OwnedRow) -> Self::Encoded;
57 fn decode(encoded: &Self::Encoded, data_types: &[DataType]) -> OwnedRow;
59}
60
61pub struct AsOfCpuEncoding;
63
64impl AsOfRowEncoding for AsOfCpuEncoding {
65 type Encoded = OwnedRow;
66
67 fn encode(row: &OwnedRow) -> OwnedRow {
68 row.clone()
70 }
71
72 fn decode(encoded: &OwnedRow, _data_types: &[DataType]) -> OwnedRow {
73 encoded.clone()
75 }
76}
77
78pub struct AsOfMemoryEncoding;
80
81impl AsOfRowEncoding for AsOfMemoryEncoding {
82 type Encoded = CompactedRow;
83
84 fn encode(row: &OwnedRow) -> CompactedRow {
85 row.into()
86 }
87
88 fn decode(encoded: &CompactedRow, data_types: &[DataType]) -> OwnedRow {
89 encoded
90 .deserialize(data_types)
91 .expect("failed to decode compacted row in AsOf join cache")
92 }
93}
94
95pub struct AsOfJoinCacheEntry<V: EstimateSize + Clone> {
101 inner: BTreeMap<InequalKeyBytes, JoinRowSet<PkSuffixBytes, V>>,
103 kv_heap_size: KvSize,
105}
106
107impl<V: EstimateSize + Clone> Default for AsOfJoinCacheEntry<V> {
108 fn default() -> Self {
109 Self {
110 inner: BTreeMap::new(),
111 kv_heap_size: KvSize::new(),
112 }
113 }
114}
115
116impl<V: EstimateSize + Clone> EstimateSize for AsOfJoinCacheEntry<V> {
117 fn estimated_heap_size(&self) -> usize {
118 self.kv_heap_size.size()
121 }
122}
123
124impl<V: EstimateSize + Clone> AsOfJoinCacheEntry<V> {
125 fn upper_bound(&self, bound: Bound<&[u8]>) -> Option<&V> {
128 self.inner
129 .range::<[u8], _>((Bound::Unbounded, bound))
130 .next_back()
131 .and_then(|(_, pk_map)| pk_map.first_key_sorted().and_then(|k| pk_map.get(k)))
132 }
133
134 fn lower_bound(&self, bound: Bound<&[u8]>) -> Option<&V> {
137 self.inner
138 .range::<[u8], _>((bound, Bound::Unbounded))
139 .next()
140 .and_then(|(_, pk_map)| pk_map.first_key_sorted().and_then(|k| pk_map.get(k)))
141 }
142
143 fn range(
145 &self,
146 range: (Bound<&[u8]>, Bound<&[u8]>),
147 ) -> impl Iterator<Item = &V> + '_ + use<'_, V> {
148 self.inner
149 .range::<[u8], _>(range)
150 .flat_map(|(_, pk_map)| pk_map.values())
151 }
152
153 fn first_by_inequality(&self, ineq_key_bytes: &[u8]) -> Option<&V> {
155 self.inner
156 .get(ineq_key_bytes)
157 .and_then(|pk_map| pk_map.first_key_sorted().and_then(|k| pk_map.get(k)))
158 }
159
160 fn first_two_by_inequality(&self, ineq_key_bytes: &[u8]) -> (Option<&V>, Option<&V>) {
162 if let Some(pk_map) = self.inner.get(ineq_key_bytes) {
163 let (first_key, second_key) = pk_map.first_two_key_sorted();
164 let first = first_key.and_then(|k| pk_map.get(k));
165 let second = second_key.and_then(|k| pk_map.get(k));
166 (first, second)
167 } else {
168 (None, None)
169 }
170 }
171
172 fn insert(
174 &mut self,
175 ineq_key_bytes: InequalKeyBytes,
176 pk_suffix_bytes: PkSuffixBytes,
177 val: V,
178 ) -> Result<(), JoinEntryError> {
179 use std::collections::btree_map::Entry;
180 let pk_map = match self.inner.entry(ineq_key_bytes) {
181 Entry::Vacant(e) => {
182 self.kv_heap_size.add_val(e.key());
183 e.insert(JoinRowSet::default())
184 }
185 Entry::Occupied(e) => e.into_mut(),
186 };
187 if !enable_strict_consistency()
188 && let Some(old_val) = pk_map.remove(&pk_suffix_bytes)
189 {
190 self.kv_heap_size.sub(&pk_suffix_bytes, &old_val);
191 }
192 self.kv_heap_size.add(&pk_suffix_bytes, &val);
193 let ret = pk_map.try_insert(pk_suffix_bytes.clone(), val);
194 if !enable_strict_consistency() {
195 assert!(ret.is_ok(), "we have removed existing entry, if any");
196 }
197 ret.map(|_| ()).map_err(|_| JoinEntryError::Occupied)
198 }
199
200 fn delete(
202 &mut self,
203 ineq_key_bytes: &InequalKeyBytes,
204 pk_suffix_bytes: &PkSuffixBytes,
205 ) -> Result<(), JoinEntryError> {
206 if let Some(pk_map) = self.inner.get_mut(ineq_key_bytes) {
207 if let Some(old_val) = pk_map.remove(pk_suffix_bytes) {
208 self.kv_heap_size.sub(pk_suffix_bytes, &old_val);
209 if pk_map.is_empty() {
210 self.kv_heap_size.sub_val(ineq_key_bytes);
211 self.inner.remove(ineq_key_bytes);
212 }
213 Ok(())
214 } else if enable_strict_consistency() {
215 Err(JoinEntryError::Remove)
216 } else {
217 consistency_error!(
218 ?pk_suffix_bytes,
219 "removing an asof join cache entry but it's not in the cache"
220 );
221 Ok(())
222 }
223 } else if enable_strict_consistency() {
224 Err(JoinEntryError::Remove)
225 } else {
226 consistency_error!(
227 ?pk_suffix_bytes,
228 "removing an asof join cache entry but it's not in the cache"
229 );
230 Ok(())
231 }
232 }
233}
234
235pub struct AsOfJoinHashMap<S: StateStore, E: AsOfRowEncoding> {
244 state: TableInner<S>,
246 inequality_key_idx: usize,
248 inequality_key_serializer: OrderedRowSerde,
250 pk_suffix_serializer: OrderedRowSerde,
252 pk_suffix_indices: Vec<usize>,
254 cache: Option<ManagedLruCache<OwnedRow, AsOfJoinCacheEntry<E::Encoded>>>,
256 join_key_indices: Vec<usize>,
258 state_all_data_types: Vec<DataType>,
260 metrics: JoinHashMapMetrics,
262}
263
264impl<S: StateStore, E: AsOfRowEncoding> AsOfJoinHashMap<S, E> {
265 #[allow(clippy::too_many_arguments)]
270 pub fn new(
271 state_join_key_indices: Vec<usize>,
272 state_table: StateTable<S>,
273 state_pk_indices: Vec<usize>,
274 inequality_key_idx: usize,
275 state_all_data_types: Vec<DataType>,
276 watermark_epoch: Option<AtomicU64Ref>,
277 metrics: Arc<StreamingMetrics>,
278 actor_id: ActorId,
279 fragment_id: FragmentId,
280 side: &'static str,
281 ) -> Self {
282 let join_table_id = state_table.table_id();
283
284 let inequality_key_serializer = OrderedRowSerde::new(
286 vec![state_all_data_types[inequality_key_idx].clone()],
287 vec![OrderType::ascending()],
288 );
289
290 let pk_suffix_serializer = OrderedRowSerde::new(
292 state_pk_indices
293 .iter()
294 .map(|idx| state_all_data_types[*idx].clone())
295 .collect(),
296 vec![OrderType::ascending(); state_pk_indices.len()],
297 );
298
299 let pk_suffix_indices = state_pk_indices.clone();
300 let join_key_indices = state_join_key_indices.clone();
301
302 let cache = watermark_epoch.map(|epoch_ref| {
303 let metrics_info = MetricsInfo::new(
304 metrics.clone(),
305 join_table_id,
306 actor_id,
307 format!("asof join {}", side),
308 );
309 ManagedLruCache::unbounded(epoch_ref, metrics_info)
310 });
311
312 let state = TableInner::new(state_pk_indices, state_join_key_indices, state_table, None);
313
314 Self {
315 state,
316 inequality_key_idx,
317 inequality_key_serializer,
318 pk_suffix_serializer,
319 pk_suffix_indices,
320 cache,
321 join_key_indices,
322 state_all_data_types,
323 metrics: JoinHashMapMetrics::new(&metrics, actor_id, fragment_id, side, join_table_id),
324 }
325 }
326
327 pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
328 self.state.table.init_epoch(epoch).await?;
329 Ok(())
330 }
331
332 pub fn update_watermark(&mut self, watermark: ScalarImpl) {
333 self.state.table.update_watermark(watermark);
334 }
335
336 pub fn table_id(&self) -> TableId {
337 self.state.table.table_id()
338 }
339
340 pub fn entry_count(&self) -> usize {
343 self.cache.as_ref().map_or(0, |c| c.len())
344 }
345
346 fn serialize_inequal_key(&self, row: &impl Row) -> Vec<u8> {
349 row.memcmp_serialize(&self.inequality_key_serializer)
350 }
351
352 fn serialize_inequal_key_from_state_row(&self, row: &impl Row) -> Vec<u8> {
354 row.project(&[self.inequality_key_idx])
355 .memcmp_serialize(&self.inequality_key_serializer)
356 }
357
358 fn serialize_pk_suffix(&self, row: &impl Row) -> Vec<u8> {
360 row.project(&self.pk_suffix_indices)
361 .memcmp_serialize(&self.pk_suffix_serializer)
362 }
363
364 async fn ensure_cache_populated(&mut self, join_key: &OwnedRow) -> StreamExecutorResult<()> {
367 let needs_populate = match &self.cache {
368 Some(cache) => !cache.contains(join_key),
369 None => return Ok(()),
370 };
371 self.metrics.inc_lookup();
372 if !needs_populate {
373 return Ok(());
374 }
375 self.metrics.inc_lookup_miss();
376
377 let sub_range: (Bound<OwnedRow>, Bound<OwnedRow>) = (Bound::Unbounded, Bound::Unbounded);
379 let table_iter = self
380 .state
381 .table
382 .iter_with_prefix(join_key, &sub_range, PrefetchOptions::default())
383 .await?;
384 let mut pinned = std::pin::pin!(table_iter);
385 let mut entry = AsOfJoinCacheEntry::<E::Encoded>::default();
386 while let Some(row) = pinned.next().await.transpose()? {
387 let ineq_key = self.serialize_inequal_key_from_state_row(&row);
388 let pk_suffix = self.serialize_pk_suffix(&row);
389 entry
390 .insert(ineq_key, pk_suffix, E::encode(&row))
391 .with_context(|| format!("row: {}", row.display()))?;
392 }
393 self.cache.as_mut().unwrap().put(join_key.clone(), entry);
394 Ok(())
395 }
396
397 pub fn insert(&mut self, value: impl Row) -> StreamExecutorResult<()> {
399 if self.cache.is_some() {
400 let owned_row = value.to_owned_row();
402 let ineq_key = self.serialize_inequal_key_from_state_row(&owned_row);
403 let pk_suffix = self.serialize_pk_suffix(&owned_row);
404 let join_key = (&owned_row).project(&self.join_key_indices).to_owned_row();
406 if let Some(cache) = &mut self.cache
408 && let Some(mut entry) = cache.get_mut(&join_key)
409 {
410 entry
411 .insert(ineq_key, pk_suffix, E::encode(&owned_row))
412 .with_context(|| format!("row: {}", owned_row.display()))?;
413 } else {
414 self.metrics.inc_insert_cache_miss();
415 }
416 self.state.table.insert(owned_row);
417 } else {
418 self.state.table.insert(value);
419 }
420 Ok(())
421 }
422
423 pub fn delete(&mut self, value: impl Row) -> StreamExecutorResult<()> {
425 if self.cache.is_some() {
426 let owned_row = value.to_owned_row();
428 let ineq_key = self.serialize_inequal_key_from_state_row(&owned_row);
429 let pk_suffix = self.serialize_pk_suffix(&owned_row);
430 let join_key = (&owned_row).project(&self.join_key_indices).to_owned_row();
432 if let Some(cache) = &mut self.cache
434 && let Some(mut entry) = cache.get_mut(&join_key)
435 {
436 entry
437 .delete(&ineq_key, &pk_suffix)
438 .with_context(|| format!("row: {}", owned_row.display()))?;
439 }
440 self.state.table.delete(owned_row);
441 } else {
442 self.state.table.delete(value);
443 }
444 Ok(())
445 }
446
447 pub fn evict_cache(&mut self) {
449 if let Some(cache) = &mut self.cache {
450 cache.evict();
451 }
452 }
453
454 pub async fn flush(
456 &mut self,
457 epoch: EpochPair,
458 ) -> StreamExecutorResult<AsOfJoinHashMapPostCommit<'_, S, E>> {
459 self.metrics.flush();
460 if let Some(cache) = &mut self.cache {
461 cache.evict();
462 }
463 let state_post_commit = self.state.table.commit(epoch).await?;
464 Ok(AsOfJoinHashMapPostCommit {
465 state: state_post_commit,
466 cache: self.cache.as_mut(),
467 })
468 }
469
470 pub async fn try_flush(&mut self) -> StreamExecutorResult<()> {
471 self.state.table.try_flush().await?;
472 Ok(())
473 }
474
475 pub async fn upper_bound_by_inequality_with_jk_prefix(
476 &mut self,
477 join_key: impl Row,
478 bound: Bound<&impl Row>,
479 ) -> StreamExecutorResult<Option<OwnedRow>> {
480 if self.cache.is_some() {
481 let join_key_owned = join_key.to_owned_row();
482 let bound_bytes = match &bound {
484 Bound::Included(r) => Bound::Included(self.serialize_inequal_key(r)),
485 Bound::Excluded(r) => Bound::Excluded(self.serialize_inequal_key(r)),
486 Bound::Unbounded => Bound::Unbounded,
487 };
488 self.ensure_cache_populated(&join_key_owned).await?;
489 let cache = self.cache.as_mut().unwrap();
490 if let Some(entry) = cache.get(&join_key_owned) {
491 let bound_ref = match &bound_bytes {
492 Bound::Included(b) => Bound::Included(b.as_slice()),
493 Bound::Excluded(b) => Bound::Excluded(b.as_slice()),
494 Bound::Unbounded => Bound::Unbounded,
495 };
496 return Ok(entry
497 .upper_bound(bound_ref)
498 .map(|v| E::decode(v, &self.state_all_data_types)));
499 }
500 }
502 let sub_range = (Bound::<OwnedRow>::Unbounded, bound);
507 let table_iter = self
508 .state
509 .table
510 .rev_iter_with_prefix(&join_key, &sub_range, PrefetchOptions::default())
511 .await?;
512 let mut pinned_table_iter = std::pin::pin!(table_iter);
513 let Some(rev_row) = pinned_table_iter.next().await.transpose()? else {
514 return Ok(None);
515 };
516 let found_ineq_key = (&rev_row)
518 .project(&[self.inequality_key_idx])
519 .to_owned_row();
520 let exact_range = (
521 Bound::Included(&found_ineq_key),
522 Bound::Included(&found_ineq_key),
523 );
524 let fwd_iter = self
525 .state
526 .table
527 .iter_with_prefix(&join_key, &exact_range, PrefetchOptions::default())
528 .await?;
529 let mut pinned_fwd = std::pin::pin!(fwd_iter);
530 let row = pinned_fwd.next().await.transpose()?;
532 Ok(row.or(Some(rev_row)))
533 }
534
535 pub async fn lower_bound_by_inequality_with_jk_prefix(
536 &mut self,
537 join_key: &impl Row,
538 bound: Bound<&impl Row>,
539 ) -> StreamExecutorResult<Option<OwnedRow>> {
540 if self.cache.is_some() {
541 let join_key_owned = join_key.to_owned_row();
542 let bound_bytes = match &bound {
544 Bound::Included(r) => Bound::Included(self.serialize_inequal_key(r)),
545 Bound::Excluded(r) => Bound::Excluded(self.serialize_inequal_key(r)),
546 Bound::Unbounded => Bound::Unbounded,
547 };
548 self.ensure_cache_populated(&join_key_owned).await?;
549 let cache = self.cache.as_mut().unwrap();
550 if let Some(entry) = cache.get(&join_key_owned) {
551 let bound_ref = match &bound_bytes {
552 Bound::Included(b) => Bound::Included(b.as_slice()),
553 Bound::Excluded(b) => Bound::Excluded(b.as_slice()),
554 Bound::Unbounded => Bound::Unbounded,
555 };
556 return Ok(entry
557 .lower_bound(bound_ref)
558 .map(|v| E::decode(v, &self.state_all_data_types)));
559 }
560 }
561 let sub_range = (bound, Bound::<OwnedRow>::Unbounded);
563 let table_iter = self
564 .state
565 .table
566 .iter_with_prefix(join_key, &sub_range, PrefetchOptions::default())
567 .await?;
568 let mut pinned_table_iter = std::pin::pin!(table_iter);
569 let row = pinned_table_iter.next().await.transpose()?;
570 Ok(row)
571 }
572
573 async fn table_iter_by_inequality_with_jk_prefix<'a>(
574 &'a self,
575 join_key: &'a impl Row,
576 range: &'a (Bound<impl Row>, Bound<impl Row>),
577 ) -> StreamExecutorResult<impl RowStream<'a>> {
578 self.state
579 .table
580 .iter_with_prefix(join_key, range, PrefetchOptions::default())
581 .await
582 }
583
584 pub async fn first_by_inequality_with_jk_prefix(
585 &mut self,
586 join_key: &impl Row,
587 inequality_key: &impl Row,
588 ) -> StreamExecutorResult<Option<OwnedRow>> {
589 if self.cache.is_some() {
590 let join_key_owned = join_key.to_owned_row();
591 let ineq_key_bytes = self.serialize_inequal_key(inequality_key);
593 self.ensure_cache_populated(&join_key_owned).await?;
594 let cache = self.cache.as_mut().unwrap();
595 if let Some(entry) = cache.get(&join_key_owned) {
596 return Ok(entry
597 .first_by_inequality(&ineq_key_bytes)
598 .map(|v| E::decode(v, &self.state_all_data_types)));
599 }
600 }
601 let range = (
603 Bound::Included(inequality_key),
604 Bound::Included(inequality_key),
605 );
606 let table_iter = self
607 .table_iter_by_inequality_with_jk_prefix(join_key, &range)
608 .await?;
609 let mut pinned_table_iter = std::pin::pin!(table_iter);
610 let row = pinned_table_iter.next().await.transpose()?;
611 Ok(row)
612 }
613
614 pub async fn first_two_by_inequality_with_jk_prefix(
618 &mut self,
619 join_key: &impl Row,
620 inequality_key: &impl Row,
621 ) -> StreamExecutorResult<(Option<OwnedRow>, Option<OwnedRow>)> {
622 if self.cache.is_some() {
623 let join_key_owned = join_key.to_owned_row();
624 let ineq_key_bytes = self.serialize_inequal_key(inequality_key);
625 self.ensure_cache_populated(&join_key_owned).await?;
626 let cache = self.cache.as_mut().unwrap();
627 if let Some(entry) = cache.get(&join_key_owned) {
628 let (first, second) = entry.first_two_by_inequality(&ineq_key_bytes);
629 return Ok((
630 first.map(|v| E::decode(v, &self.state_all_data_types)),
631 second.map(|v| E::decode(v, &self.state_all_data_types)),
632 ));
633 }
634 }
635 let range = (
637 Bound::Included(inequality_key),
638 Bound::Included(inequality_key),
639 );
640 let table_iter = self
641 .table_iter_by_inequality_with_jk_prefix(join_key, &range)
642 .await?;
643 let mut pinned_table_iter = std::pin::pin!(table_iter);
644 let first = pinned_table_iter.next().await.transpose()?;
645 let second = if first.is_some() {
646 pinned_table_iter.next().await.transpose()?
647 } else {
648 None
649 };
650 Ok((first, second))
651 }
652
653 pub async fn range_by_inequality_with_jk_prefix<'a>(
654 &'a mut self,
655 join_key: &'a impl Row,
656 inequality_key_range: &'a (Bound<impl Row>, Bound<impl Row>),
657 ) -> StreamExecutorResult<impl RowStream<'a>> {
658 if self.cache.is_some() {
659 let join_key_owned = join_key.to_owned_row();
660 let bound_lo = match &inequality_key_range.0 {
662 Bound::Included(r) => Bound::Included(self.serialize_inequal_key(r)),
663 Bound::Excluded(r) => Bound::Excluded(self.serialize_inequal_key(r)),
664 Bound::Unbounded => Bound::Unbounded,
665 };
666 let bound_hi = match &inequality_key_range.1 {
667 Bound::Included(r) => Bound::Included(self.serialize_inequal_key(r)),
668 Bound::Excluded(r) => Bound::Excluded(self.serialize_inequal_key(r)),
669 Bound::Unbounded => Bound::Unbounded,
670 };
671 self.ensure_cache_populated(&join_key_owned).await?;
672 let cache = self.cache.as_mut().unwrap();
673 if let Some(entry) = cache.get(&join_key_owned) {
674 let range = (
675 bound_lo.as_ref().map(|b| b.as_slice()),
676 bound_hi.as_ref().map(|b| b.as_slice()),
677 );
678 let data_types = &self.state_all_data_types;
680 return Ok(futures::future::Either::Left(futures::stream::iter(
681 entry.range(range).map(|v| Ok(E::decode(v, data_types))),
682 )));
683 }
684 }
685 let table_iter = self
687 .state
688 .table
689 .iter_with_prefix(join_key, inequality_key_range, PrefetchOptions::default())
690 .await?;
691 Ok(futures::future::Either::Right(table_iter))
692 }
693
694 pub fn check_inequal_key_null(&self, row: &impl Row) -> bool {
696 row.datum_at(self.inequality_key_idx).is_none()
697 }
698
699 pub fn get_pk_from_row<'a>(&'a self, row: impl Row + 'a) -> impl Row + 'a {
700 row.project(&self.state.pk_indices)
701 }
702}
703
704#[must_use]
705pub struct AsOfJoinHashMapPostCommit<'a, S: StateStore, E: AsOfRowEncoding> {
706 state: StateTablePostCommit<'a, S>,
707 cache: Option<&'a mut ManagedLruCache<OwnedRow, AsOfJoinCacheEntry<E::Encoded>>>,
708}
709
710impl<S: StateStore, E: AsOfRowEncoding> AsOfJoinHashMapPostCommit<'_, S, E> {
711 pub async fn post_yield_barrier(
712 self,
713 vnode_bitmap: Option<Arc<Bitmap>>,
714 ) -> StreamExecutorResult<Option<bool>> {
715 let cache_may_stale = self.state.post_yield_barrier(vnode_bitmap.clone()).await?;
716 let keyed_cache_may_stale = cache_may_stale.map(|(_, changed)| changed);
717 if keyed_cache_may_stale.unwrap_or(false)
719 && let Some(cache) = self.cache
720 {
721 cache.clear();
722 }
723 Ok(keyed_cache_may_stale)
724 }
725}