1use std::collections::HashMap;
16use std::ops::Bound;
17use std::ops::Bound::*;
18use std::sync::Arc;
19
20use bytes::{BufMut, Bytes, BytesMut};
21use either::Either;
22use foyer::CacheHint;
23use futures::{Stream, StreamExt, TryStreamExt, pin_mut};
24use futures_async_stream::for_await;
25use itertools::{Itertools, izip};
26use risingwave_common::array::stream_record::Record;
27use risingwave_common::array::{ArrayImplBuilder, ArrayRef, DataChunk, Op, StreamChunk};
28use risingwave_common::bitmap::Bitmap;
29use risingwave_common::catalog::{
30 ColumnDesc, ColumnId, TableId, TableOption, get_dist_key_in_pk_indices,
31};
32use risingwave_common::hash::{VirtualNode, VnodeBitmapExt, VnodeCountCompat};
33use risingwave_common::row::{self, Once, OwnedRow, Row, RowExt, once};
34use risingwave_common::types::{DataType, Datum, DefaultOrd, DefaultOrdered, ScalarImpl};
35use risingwave_common::util::column_index_mapping::ColIndexMapping;
36use risingwave_common::util::epoch::EpochPair;
37use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
38use risingwave_common::util::row_serde::OrderedRowSerde;
39use risingwave_common::util::sort_util::OrderType;
40use risingwave_common::util::value_encoding::BasicSerde;
41use risingwave_hummock_sdk::HummockReadEpoch;
42use risingwave_hummock_sdk::key::{
43 CopyFromSlice, TableKey, TableKeyRange, end_bound_of_prefix, prefixed_range_with_vnode,
44 start_bound_of_excluded_prefix,
45};
46use risingwave_hummock_sdk::table_watermark::{
47 VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
48};
49use risingwave_pb::catalog::Table;
50use risingwave_storage::StateStore;
51use risingwave_storage::error::{ErrorKind, StorageError, StorageResult};
52use risingwave_storage::hummock::CachePolicy;
53use risingwave_storage::mem_table::MemTableError;
54use risingwave_storage::row_serde::find_columns_by_ids;
55use risingwave_storage::row_serde::row_serde_util::{
56 deserialize_pk_with_vnode, serialize_pk, serialize_pk_with_vnode,
57};
58use risingwave_storage::row_serde::value_serde::ValueRowSerde;
59use risingwave_storage::store::*;
60use risingwave_storage::table::merge_sort::merge_sort;
61use risingwave_storage::table::{
62 ChangeLogRow, KeyedRow, TableDistribution, deserialize_log_stream,
63};
64use thiserror_ext::AsReport;
65use tracing::{Instrument, trace};
66
67use crate::cache::cache_may_stale;
68use crate::common::state_cache::{StateCache, StateCacheFiller};
69use crate::common::table::state_table_cache::StateTableWatermarkCache;
70use crate::executor::StreamExecutorResult;
71
72const WATERMARK_CACHE_ENTRIES: usize = 16;
76
77macro_rules! insane_mode_discard_point {
80 () => {{
81 use rand::Rng;
82 if crate::consistency::insane() && rand::rng().random_bool(0.3) {
83 return;
84 }
85 }};
86}
87
88#[derive(Clone)]
91pub struct StateTableInner<
92 S,
93 SD = BasicSerde,
94 const IS_REPLICATED: bool = false,
95 const USE_WATERMARK_CACHE: bool = false,
96> where
97 S: StateStore,
98 SD: ValueRowSerde,
99{
100 table_id: TableId,
102
103 local_store: S::Local,
105
106 store: S,
108
109 pk_serde: OrderedRowSerde,
111
112 row_serde: SD,
114
115 pk_indices: Vec<usize>,
119
120 distribution: TableDistribution,
126
127 prefix_hint_len: usize,
128
129 table_option: TableOption,
131
132 value_indices: Option<Vec<usize>>,
133
134 pending_watermark: Option<ScalarImpl>,
136 committed_watermark: Option<ScalarImpl>,
138 watermark_cache: StateTableWatermarkCache,
140
141 data_types: Vec<DataType>,
144
145 i2o_mapping: ColIndexMapping,
151
152 output_indices: Vec<usize>,
157
158 op_consistency_level: StateTableOpConsistencyLevel,
159
160 clean_watermark_index_in_pk: Option<i32>,
161
162 on_post_commit: bool,
165}
166
167pub type StateTable<S> = StateTableInner<S, BasicSerde>;
169pub type ReplicatedStateTable<S, SD> = StateTableInner<S, SD, true>;
172pub type WatermarkCacheStateTable<S> = StateTableInner<S, BasicSerde, false, true>;
175pub type WatermarkCacheParameterizedStateTable<S, const USE_WATERMARK_CACHE: bool> =
176 StateTableInner<S, BasicSerde, false, USE_WATERMARK_CACHE>;
177
178impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
180 StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
181where
182 S: StateStore,
183 SD: ValueRowSerde,
184{
185 pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
188 self.local_store.init(InitOptions::new(epoch)).await?;
189 Ok(())
190 }
191
192 pub async fn try_wait_committed_epoch(&self, prev_epoch: u64) -> StorageResult<()> {
193 self.store
194 .try_wait_epoch(
195 HummockReadEpoch::Committed(prev_epoch),
196 TryWaitEpochOptions {
197 table_id: self.table_id,
198 },
199 )
200 .await
201 }
202
203 pub fn state_store(&self) -> &S {
204 &self.store
205 }
206}
207
208fn consistent_old_value_op(
209 row_serde: impl ValueRowSerde,
210 is_log_store: bool,
211) -> OpConsistencyLevel {
212 OpConsistencyLevel::ConsistentOldValue {
213 check_old_value: Arc::new(move |first: &Bytes, second: &Bytes| {
214 if first == second {
215 return true;
216 }
217 let first = match row_serde.deserialize(first) {
218 Ok(rows) => rows,
219 Err(e) => {
220 error!(error = %e.as_report(), value = ?first, "fail to deserialize serialized value");
221 return false;
222 }
223 };
224 let second = match row_serde.deserialize(second) {
225 Ok(rows) => rows,
226 Err(e) => {
227 error!(error = %e.as_report(), value = ?second, "fail to deserialize serialized value");
228 return false;
229 }
230 };
231 if first != second {
232 error!(first = ?first, second = ?second, "sanity check fail");
233 false
234 } else {
235 true
236 }
237 }),
238 is_log_store,
239 }
240}
241
242#[derive(Eq, PartialEq, Copy, Clone, Debug)]
243pub enum StateTableOpConsistencyLevel {
244 Inconsistent,
246 ConsistentOldValue,
250 LogStoreEnabled,
253}
254
255impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
260 StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
261where
262 S: StateStore,
263 SD: ValueRowSerde,
264{
265 pub async fn from_table_catalog(
269 table_catalog: &Table,
270 store: S,
271 vnodes: Option<Arc<Bitmap>>,
272 ) -> Self {
273 Self::from_table_catalog_with_consistency_level(
274 table_catalog,
275 store,
276 vnodes,
277 StateTableOpConsistencyLevel::ConsistentOldValue,
278 )
279 .await
280 }
281
282 pub async fn from_table_catalog_inconsistent_op(
284 table_catalog: &Table,
285 store: S,
286 vnodes: Option<Arc<Bitmap>>,
287 ) -> Self {
288 Self::from_table_catalog_with_consistency_level(
289 table_catalog,
290 store,
291 vnodes,
292 StateTableOpConsistencyLevel::Inconsistent,
293 )
294 .await
295 }
296
297 pub async fn from_table_catalog_with_consistency_level(
298 table_catalog: &Table,
299 store: S,
300 vnodes: Option<Arc<Bitmap>>,
301 consistency_level: StateTableOpConsistencyLevel,
302 ) -> Self {
303 Self::from_table_catalog_inner(table_catalog, store, vnodes, consistency_level, vec![])
304 .await
305 }
306
307 async fn from_table_catalog_inner(
309 table_catalog: &Table,
310 store: S,
311 vnodes: Option<Arc<Bitmap>>,
312 op_consistency_level: StateTableOpConsistencyLevel,
313 output_column_ids: Vec<ColumnId>,
314 ) -> Self {
315 let table_id = TableId::new(table_catalog.id);
316 let table_columns: Vec<ColumnDesc> = table_catalog
317 .columns
318 .iter()
319 .map(|col| col.column_desc.as_ref().unwrap().into())
320 .collect();
321 let data_types: Vec<DataType> = table_catalog
322 .columns
323 .iter()
324 .map(|col| {
325 col.get_column_desc()
326 .unwrap()
327 .get_column_type()
328 .unwrap()
329 .into()
330 })
331 .collect();
332 let order_types: Vec<OrderType> = table_catalog
333 .pk
334 .iter()
335 .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
336 .collect();
337 let dist_key_indices: Vec<usize> = table_catalog
338 .distribution_key
339 .iter()
340 .map(|dist_index| *dist_index as usize)
341 .collect();
342
343 let pk_indices = table_catalog
344 .pk
345 .iter()
346 .map(|col_order| col_order.column_index as usize)
347 .collect_vec();
348
349 let dist_key_in_pk_indices = if table_catalog.get_dist_key_in_pk().is_empty() {
351 get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices).unwrap()
352 } else {
353 table_catalog
354 .get_dist_key_in_pk()
355 .iter()
356 .map(|idx| *idx as usize)
357 .collect()
358 };
359
360 let vnode_col_idx_in_pk = table_catalog.vnode_col_index.as_ref().and_then(|idx| {
361 let vnode_col_idx = *idx as usize;
362 pk_indices.iter().position(|&i| vnode_col_idx == i)
363 });
364
365 let distribution =
366 TableDistribution::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk);
367 assert_eq!(
368 distribution.vnode_count(),
369 table_catalog.vnode_count(),
370 "vnode count mismatch, scanning table {} under wrong distribution?",
371 table_catalog.name,
372 );
373
374 let pk_data_types = pk_indices
375 .iter()
376 .map(|i| table_columns[*i].data_type.clone())
377 .collect();
378 let pk_serde = OrderedRowSerde::new(pk_data_types, order_types);
379
380 let input_value_indices = table_catalog
381 .value_indices
382 .iter()
383 .map(|val| *val as usize)
384 .collect_vec();
385
386 let no_shuffle_value_indices = (0..table_columns.len()).collect_vec();
387
388 let value_indices = match input_value_indices.len() == table_columns.len()
390 && input_value_indices == no_shuffle_value_indices
391 {
392 true => None,
393 false => Some(input_value_indices),
394 };
395 let prefix_hint_len = table_catalog.read_prefix_len_hint as usize;
396
397 let make_row_serde = || {
398 SD::new(
399 Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
400 Arc::from(table_columns.clone().into_boxed_slice()),
401 )
402 };
403
404 let state_table_op_consistency_level = op_consistency_level;
405 let op_consistency_level = match op_consistency_level {
406 StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
407 StateTableOpConsistencyLevel::ConsistentOldValue => {
408 let row_serde = make_row_serde();
409 consistent_old_value_op(row_serde, false)
410 }
411 StateTableOpConsistencyLevel::LogStoreEnabled => {
412 let row_serde = make_row_serde();
413 consistent_old_value_op(row_serde, true)
414 }
415 };
416
417 let table_option = TableOption::new(table_catalog.retention_seconds);
418 let new_local_options = if IS_REPLICATED {
419 NewLocalOptions::new_replicated(
420 table_id,
421 op_consistency_level,
422 table_option,
423 distribution.vnodes().clone(),
424 )
425 } else {
426 NewLocalOptions::new(
427 table_id,
428 op_consistency_level,
429 table_option,
430 distribution.vnodes().clone(),
431 )
432 };
433 let local_state_store = store.new_local(new_local_options).await;
434
435 let row_serde = make_row_serde();
436
437 assert_eq!(
443 table_catalog.version.is_some(),
444 row_serde.kind().is_column_aware()
445 );
446
447 let watermark_serde = if pk_indices.is_empty() {
449 None
450 } else {
451 match table_catalog.clean_watermark_index_in_pk {
452 None => Some(pk_serde.index(0)),
453 Some(clean_watermark_index_in_pk) => {
454 Some(pk_serde.index(clean_watermark_index_in_pk as usize))
455 }
456 }
457 };
458 let max_watermark_of_vnodes = distribution
459 .vnodes()
460 .iter_vnodes()
461 .filter_map(|vnode| local_state_store.get_table_watermark(vnode))
462 .max();
463 let committed_watermark = if let Some(deser) = watermark_serde
464 && let Some(max_watermark) = max_watermark_of_vnodes
465 {
466 let deserialized = deser.deserialize(&max_watermark).ok().and_then(|row| {
467 assert!(row.len() == 1);
468 row[0].clone()
469 });
470 if deserialized.is_none() {
471 tracing::error!(
472 vnodes = ?distribution.vnodes(),
473 watermark = ?max_watermark,
474 "Failed to deserialize persisted watermark from state store.",
475 );
476 }
477 deserialized
478 } else {
479 None
480 };
481
482 let watermark_cache = if USE_WATERMARK_CACHE {
483 StateTableWatermarkCache::new(WATERMARK_CACHE_ENTRIES)
484 } else {
485 StateTableWatermarkCache::new(0)
486 };
487
488 let output_column_ids_to_input_idx = output_column_ids
490 .iter()
491 .enumerate()
492 .map(|(pos, id)| (*id, pos))
493 .collect::<HashMap<_, _>>();
494
495 let columns: Vec<ColumnDesc> = table_catalog
497 .columns
498 .iter()
499 .map(|c| c.column_desc.as_ref().unwrap().into())
500 .collect_vec();
501
502 let mut i2o_mapping = vec![None; columns.len()];
506 for (i, column) in columns.iter().enumerate() {
507 if let Some(pos) = output_column_ids_to_input_idx.get(&column.column_id) {
508 i2o_mapping[i] = Some(*pos);
509 }
510 }
511 let i2o_mapping = ColIndexMapping::new(i2o_mapping, output_column_ids.len());
513
514 let (_, output_indices) = find_columns_by_ids(&columns[..], &output_column_ids);
516
517 Self {
518 table_id,
519 local_store: local_state_store,
520 store,
521 pk_serde,
522 row_serde,
523 pk_indices,
524 distribution,
525 prefix_hint_len,
526 table_option,
527 value_indices,
528 pending_watermark: None,
529 committed_watermark,
530 watermark_cache,
531 data_types,
532 output_indices,
533 i2o_mapping,
534 op_consistency_level: state_table_op_consistency_level,
535 clean_watermark_index_in_pk: table_catalog.clean_watermark_index_in_pk,
536 on_post_commit: false,
537 }
538 }
539
540 pub fn get_data_types(&self) -> &[DataType] {
541 &self.data_types
542 }
543
544 pub fn table_id(&self) -> u32 {
545 self.table_id.table_id
546 }
547
548 pub fn epoch(&self) -> u64 {
550 self.local_store.epoch()
551 }
552
553 fn compute_prefix_vnode(&self, pk_prefix: impl Row) -> VirtualNode {
555 self.distribution
556 .try_compute_vnode_by_pk_prefix(pk_prefix)
557 .expect("For streaming, the given prefix must be enough to calculate the vnode")
558 }
559
560 pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode {
562 self.distribution.compute_vnode_by_pk(pk)
563 }
564
565 pub fn pk_indices(&self) -> &[usize] {
568 &self.pk_indices
569 }
570
571 pub fn pk_in_output_indices(&self) -> Option<Vec<usize>> {
575 assert!(IS_REPLICATED);
576 self.pk_indices
577 .iter()
578 .map(|&i| self.output_indices.iter().position(|&j| i == j))
579 .collect()
580 }
581
582 pub fn pk_serde(&self) -> &OrderedRowSerde {
583 &self.pk_serde
584 }
585
586 pub fn vnodes(&self) -> &Arc<Bitmap> {
587 self.distribution.vnodes()
588 }
589
590 pub fn value_indices(&self) -> &Option<Vec<usize>> {
591 &self.value_indices
592 }
593
594 fn is_dirty(&self) -> bool {
595 self.local_store.is_dirty() || self.pending_watermark.is_some()
596 }
597
598 pub fn is_consistent_op(&self) -> bool {
599 matches!(
600 self.op_consistency_level,
601 StateTableOpConsistencyLevel::ConsistentOldValue
602 | StateTableOpConsistencyLevel::LogStoreEnabled
603 )
604 }
605}
606
607impl<S, SD, const USE_WATERMARK_CACHE: bool> StateTableInner<S, SD, true, USE_WATERMARK_CACHE>
608where
609 S: StateStore,
610 SD: ValueRowSerde,
611{
612 pub async fn from_table_catalog_with_output_column_ids(
614 table_catalog: &Table,
615 store: S,
616 vnodes: Option<Arc<Bitmap>>,
617 output_column_ids: Vec<ColumnId>,
618 ) -> Self {
619 Self::from_table_catalog_inner(
620 table_catalog,
621 store,
622 vnodes,
623 StateTableOpConsistencyLevel::Inconsistent,
624 output_column_ids,
625 )
626 .await
627 }
628}
629
630impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
632 StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
633where
634 S: StateStore,
635 SD: ValueRowSerde,
636{
637 pub async fn get_row(&self, pk: impl Row) -> StreamExecutorResult<Option<OwnedRow>> {
639 let encoded_row: Option<Bytes> = self.get_encoded_row(pk).await?;
640 match encoded_row {
641 Some(encoded_row) => {
642 let row = self.row_serde.deserialize(&encoded_row)?;
643 if IS_REPLICATED {
644 let row = row.project(&self.output_indices);
647 Ok(Some(row.into_owned_row()))
648 } else {
649 Ok(Some(OwnedRow::new(row)))
650 }
651 }
652 None => Ok(None),
653 }
654 }
655
656 pub async fn get_encoded_row(&self, pk: impl Row) -> StreamExecutorResult<Option<Bytes>> {
658 assert!(pk.len() <= self.pk_indices.len());
659
660 let serialized_pk =
661 serialize_pk_with_vnode(&pk, &self.pk_serde, self.compute_vnode_by_pk(&pk));
662
663 let prefix_hint = if self.prefix_hint_len != 0 && self.prefix_hint_len == pk.len() {
664 Some(serialized_pk.slice(VirtualNode::SIZE..))
665 } else {
666 #[cfg(debug_assertions)]
667 if self.prefix_hint_len != 0 {
668 warn!(
669 "prefix_hint_len is not equal to pk.len(), may not be able to utilize bloom filter"
670 );
671 }
672 None
673 };
674
675 let read_options = ReadOptions {
676 prefix_hint,
677 retention_seconds: self.table_option.retention_seconds,
678 table_id: self.table_id,
679 cache_policy: CachePolicy::Fill(CacheHint::Normal),
680 ..Default::default()
681 };
682
683 self.local_store
684 .get(serialized_pk, read_options)
685 .await
686 .map_err(Into::into)
687 }
688}
689
690#[must_use]
705pub struct StateTablePostCommit<
706 'a,
707 S,
708 SD = BasicSerde,
709 const IS_REPLICATED: bool = false,
710 const USE_WATERMARK_CACHE: bool = false,
711> where
712 S: StateStore,
713 SD: ValueRowSerde,
714{
715 inner: &'a mut StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>,
716}
717
718impl<'a, S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
719 StateTablePostCommit<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
720where
721 S: StateStore,
722 SD: ValueRowSerde,
723{
724 pub async fn post_yield_barrier(
725 mut self,
726 new_vnodes: Option<Arc<Bitmap>>,
727 ) -> StreamExecutorResult<
728 Option<(
729 (
730 Arc<Bitmap>,
731 Arc<Bitmap>,
732 &'a mut StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>,
733 ),
734 bool,
735 )>,
736 > {
737 self.inner.on_post_commit = false;
738 Ok(if let Some(new_vnodes) = new_vnodes {
739 let (old_vnodes, cache_may_stale) =
740 self.update_vnode_bitmap(new_vnodes.clone()).await?;
741 Some(((new_vnodes, old_vnodes, self.inner), cache_may_stale))
742 } else {
743 None
744 })
745 }
746
747 pub fn inner(&self) -> &StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE> {
748 &*self.inner
749 }
750
751 async fn update_vnode_bitmap(
753 &mut self,
754 new_vnodes: Arc<Bitmap>,
755 ) -> StreamExecutorResult<(Arc<Bitmap>, bool)> {
756 assert!(
757 !self.inner.is_dirty(),
758 "vnode bitmap should only be updated when state table is clean"
759 );
760 let prev_vnodes = self
761 .inner
762 .local_store
763 .update_vnode_bitmap(new_vnodes.clone())
764 .await?;
765 assert_eq!(
766 &prev_vnodes,
767 self.inner.vnodes(),
768 "state table and state store vnode bitmap mismatches"
769 );
770
771 if self.inner.distribution.is_singleton() {
772 assert_eq!(
773 &new_vnodes,
774 self.inner.vnodes(),
775 "should not update vnode bitmap for singleton table"
776 );
777 }
778 assert_eq!(self.inner.vnodes().len(), new_vnodes.len());
779
780 let cache_may_stale = cache_may_stale(self.inner.vnodes(), &new_vnodes);
781
782 if cache_may_stale {
783 self.inner.pending_watermark = None;
784 if USE_WATERMARK_CACHE {
785 self.inner.watermark_cache.clear();
786 }
787 }
788
789 Ok((
790 self.inner.distribution.update_vnode_bitmap(new_vnodes),
791 cache_may_stale,
792 ))
793 }
794}
795
796impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
798 StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
799where
800 S: StateStore,
801 SD: ValueRowSerde,
802{
803 fn handle_mem_table_error(&self, e: StorageError) {
804 let e = match e.into_inner() {
805 ErrorKind::MemTable(e) => e,
806 _ => unreachable!("should only get memtable error"),
807 };
808 match *e {
809 MemTableError::InconsistentOperation { key, prev, new } => {
810 let (vnode, key) = deserialize_pk_with_vnode(&key, &self.pk_serde).unwrap();
811 panic!(
812 "mem-table operation inconsistent! table_id: {}, vnode: {}, key: {:?}, prev: {}, new: {}",
813 self.table_id(),
814 vnode,
815 &key,
816 prev.debug_fmt(&self.row_serde),
817 new.debug_fmt(&self.row_serde),
818 )
819 }
820 }
821 }
822
823 fn serialize_value(&self, value: impl Row) -> Bytes {
824 if let Some(value_indices) = self.value_indices.as_ref() {
825 self.row_serde
826 .serialize(value.project(value_indices))
827 .into()
828 } else {
829 self.row_serde.serialize(value).into()
830 }
831 }
832
833 fn insert_inner(&mut self, key: TableKey<Bytes>, value_bytes: Bytes) {
834 insane_mode_discard_point!();
835 self.local_store
836 .insert(key, value_bytes, None)
837 .unwrap_or_else(|e| self.handle_mem_table_error(e));
838 }
839
840 fn delete_inner(&mut self, key: TableKey<Bytes>, value_bytes: Bytes) {
841 insane_mode_discard_point!();
842 self.local_store
843 .delete(key, value_bytes)
844 .unwrap_or_else(|e| self.handle_mem_table_error(e));
845 }
846
847 fn update_inner(
848 &mut self,
849 key_bytes: TableKey<Bytes>,
850 old_value_bytes: Option<Bytes>,
851 new_value_bytes: Bytes,
852 ) {
853 insane_mode_discard_point!();
854 self.local_store
855 .insert(key_bytes, new_value_bytes, old_value_bytes)
856 .unwrap_or_else(|e| self.handle_mem_table_error(e));
857 }
858
859 pub fn insert(&mut self, value: impl Row) {
862 let pk_indices = &self.pk_indices;
863 let pk = (&value).project(pk_indices);
864 if USE_WATERMARK_CACHE {
865 self.watermark_cache.insert(&pk);
866 }
867
868 let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_vnode_by_pk(pk));
869 let value_bytes = self.serialize_value(value);
870 self.insert_inner(key_bytes, value_bytes);
871 }
872
873 pub fn delete(&mut self, old_value: impl Row) {
876 let pk_indices = &self.pk_indices;
877 let pk = (&old_value).project(pk_indices);
878 if USE_WATERMARK_CACHE {
879 self.watermark_cache.delete(&pk);
880 }
881
882 let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_vnode_by_pk(pk));
883 let value_bytes = self.serialize_value(old_value);
884 self.delete_inner(key_bytes, value_bytes);
885 }
886
887 pub fn update(&mut self, old_value: impl Row, new_value: impl Row) {
889 let old_pk = (&old_value).project(self.pk_indices());
890 let new_pk = (&new_value).project(self.pk_indices());
891 debug_assert!(
892 Row::eq(&old_pk, new_pk),
893 "pk should not change: {old_pk:?} vs {new_pk:?}",
894 );
895
896 let new_key_bytes =
897 serialize_pk_with_vnode(new_pk, &self.pk_serde, self.compute_vnode_by_pk(new_pk));
898 let old_value_bytes = self.serialize_value(old_value);
899 let new_value_bytes = self.serialize_value(new_value);
900
901 self.update_inner(new_key_bytes, Some(old_value_bytes), new_value_bytes);
902 }
903
904 pub fn write_record(&mut self, record: Record<impl Row>) {
906 match record {
907 Record::Insert { new_row } => self.insert(new_row),
908 Record::Delete { old_row } => self.delete(old_row),
909 Record::Update { old_row, new_row } => self.update(old_row, new_row),
910 }
911 }
912
913 fn fill_non_output_indices(&self, chunk: StreamChunk) -> StreamChunk {
914 fill_non_output_indices(&self.i2o_mapping, &self.data_types, chunk)
915 }
916
917 #[allow(clippy::disallowed_methods)]
920 pub fn write_chunk(&mut self, chunk: StreamChunk) {
921 let chunk = if IS_REPLICATED {
922 self.fill_non_output_indices(chunk)
923 } else {
924 chunk
925 };
926 let (chunk, op) = chunk.into_parts();
927
928 let vnodes = self
929 .distribution
930 .compute_chunk_vnode(&chunk, &self.pk_indices);
931
932 let values = if let Some(ref value_indices) = self.value_indices {
933 chunk.project(value_indices).serialize_with(&self.row_serde)
934 } else {
935 chunk.serialize_with(&self.row_serde)
936 };
937
938 let key_chunk = chunk.project(self.pk_indices());
943 let vnode_and_pks = key_chunk
944 .rows_with_holes()
945 .zip_eq_fast(vnodes.iter())
946 .map(|(r, vnode)| {
947 let mut buffer = BytesMut::new();
948 buffer.put_slice(&vnode.to_be_bytes()[..]);
949 if let Some(r) = r {
950 self.pk_serde.serialize(r, &mut buffer);
951 }
952 (r, buffer.freeze())
953 })
954 .collect_vec();
955
956 if !key_chunk.is_compacted() {
957 for ((op, (key, key_bytes), value), vis) in
958 izip!(op.iter(), vnode_and_pks, values).zip_eq_debug(key_chunk.visibility().iter())
959 {
960 if vis {
961 match op {
962 Op::Insert | Op::UpdateInsert => {
963 if USE_WATERMARK_CACHE && let Some(ref pk) = key {
964 self.watermark_cache.insert(pk);
965 }
966 self.insert_inner(TableKey(key_bytes), value);
967 }
968 Op::Delete | Op::UpdateDelete => {
969 if USE_WATERMARK_CACHE && let Some(ref pk) = key {
970 self.watermark_cache.delete(pk);
971 }
972 self.delete_inner(TableKey(key_bytes), value);
973 }
974 }
975 }
976 }
977 } else {
978 for (op, (key, key_bytes), value) in izip!(op.iter(), vnode_and_pks, values) {
979 match op {
980 Op::Insert | Op::UpdateInsert => {
981 if USE_WATERMARK_CACHE && let Some(ref pk) = key {
982 self.watermark_cache.insert(pk);
983 }
984 self.insert_inner(TableKey(key_bytes), value);
985 }
986 Op::Delete | Op::UpdateDelete => {
987 if USE_WATERMARK_CACHE && let Some(ref pk) = key {
988 self.watermark_cache.delete(pk);
989 }
990 self.delete_inner(TableKey(key_bytes), value);
991 }
992 }
993 }
994 }
995 }
996
997 pub fn update_watermark(&mut self, watermark: ScalarImpl) {
1003 trace!(table_id = %self.table_id, watermark = ?watermark, "update watermark");
1004 self.pending_watermark = Some(watermark);
1005 }
1006
1007 pub fn get_committed_watermark(&self) -> Option<&ScalarImpl> {
1010 self.committed_watermark.as_ref()
1011 }
1012
1013 pub async fn commit(
1014 &mut self,
1015 new_epoch: EpochPair,
1016 ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1017 {
1018 self.commit_inner(new_epoch, None).await
1019 }
1020
1021 #[cfg(test)]
1022 pub async fn commit_for_test(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
1023 self.commit_assert_no_update_vnode_bitmap(new_epoch).await
1024 }
1025
1026 pub async fn commit_assert_no_update_vnode_bitmap(
1027 &mut self,
1028 new_epoch: EpochPair,
1029 ) -> StreamExecutorResult<()> {
1030 let post_commit = self.commit_inner(new_epoch, None).await?;
1031 post_commit.post_yield_barrier(None).await?;
1032 Ok(())
1033 }
1034
1035 pub async fn commit_may_switch_consistent_op(
1036 &mut self,
1037 new_epoch: EpochPair,
1038 op_consistency_level: StateTableOpConsistencyLevel,
1039 ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1040 {
1041 if self.op_consistency_level != op_consistency_level {
1042 info!(
1043 ?new_epoch,
1044 prev_op_consistency_level = ?self.op_consistency_level,
1045 ?op_consistency_level,
1046 table_id = self.table_id.table_id,
1047 "switch to new op consistency level"
1048 );
1049 self.commit_inner(new_epoch, Some(op_consistency_level))
1050 .await
1051 } else {
1052 self.commit_inner(new_epoch, None).await
1053 }
1054 }
1055
1056 async fn commit_inner(
1057 &mut self,
1058 new_epoch: EpochPair,
1059 switch_consistent_op: Option<StateTableOpConsistencyLevel>,
1060 ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1061 {
1062 assert!(!self.on_post_commit);
1063 assert_eq!(self.epoch(), new_epoch.prev);
1064 let switch_op_consistency_level = switch_consistent_op.map(|new_consistency_level| {
1065 assert_ne!(self.op_consistency_level, new_consistency_level);
1066 self.op_consistency_level = new_consistency_level;
1067 match new_consistency_level {
1068 StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
1069 StateTableOpConsistencyLevel::ConsistentOldValue => {
1070 consistent_old_value_op(self.row_serde.clone(), false)
1071 }
1072 StateTableOpConsistencyLevel::LogStoreEnabled => {
1073 consistent_old_value_op(self.row_serde.clone(), true)
1074 }
1075 }
1076 });
1077 trace!(
1078 table_id = %self.table_id,
1079 epoch = ?self.epoch(),
1080 "commit state table"
1081 );
1082
1083 let mut table_watermarks = None;
1084 if self.is_dirty() {
1085 self.local_store
1086 .flush()
1087 .instrument(tracing::info_span!("state_table_flush"))
1088 .await?;
1089 table_watermarks = self.commit_pending_watermark();
1090 }
1091 self.local_store.seal_current_epoch(
1092 new_epoch.curr,
1093 SealCurrentEpochOptions {
1094 table_watermarks,
1095 switch_op_consistency_level,
1096 },
1097 );
1098
1099 if USE_WATERMARK_CACHE && !self.watermark_cache.is_synced() {
1101 if let Some(ref watermark) = self.committed_watermark {
1102 let range: (Bound<Once<Datum>>, Bound<Once<Datum>>) =
1103 (Included(once(Some(watermark.clone()))), Unbounded);
1104 let mut pks = Vec::with_capacity(self.watermark_cache.capacity());
1113 {
1114 let mut streams = vec![];
1115 for vnode in self.vnodes().iter_vnodes() {
1116 let stream = self
1117 .iter_keyed_row_with_vnode(vnode, &range, PrefetchOptions::default())
1118 .await?;
1119 streams.push(Box::pin(stream));
1120 }
1121 let merged_stream = merge_sort(streams);
1122 pin_mut!(merged_stream);
1123
1124 #[for_await]
1125 for entry in merged_stream.take(self.watermark_cache.capacity()) {
1126 let keyed_row = entry?;
1127 let pk = self.pk_serde.deserialize(keyed_row.key())?;
1128 if !pk.is_null_at(self.clean_watermark_index_in_pk.unwrap_or(0) as usize) {
1130 pks.push(pk);
1131 }
1132 }
1133 }
1134
1135 let mut filler = self.watermark_cache.begin_syncing();
1136 for pk in pks {
1137 filler.insert_unchecked(DefaultOrdered(pk), ());
1138 }
1139 filler.finish();
1140
1141 let n_cache_entries = self.watermark_cache.len();
1142 if n_cache_entries < self.watermark_cache.capacity() {
1143 self.watermark_cache.set_table_row_count(n_cache_entries);
1144 }
1145 }
1146 }
1147
1148 self.on_post_commit = true;
1149 Ok(StateTablePostCommit { inner: self })
1150 }
1151
1152 fn commit_pending_watermark(
1154 &mut self,
1155 ) -> Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)> {
1156 let watermark = self.pending_watermark.take();
1157 watermark.as_ref().inspect(|watermark| {
1158 trace!(table_id = %self.table_id, watermark = ?watermark, "state cleaning");
1159 });
1160
1161 let watermark_serializer = if self.pk_indices().is_empty() {
1162 None
1163 } else {
1164 match self.clean_watermark_index_in_pk {
1165 None => Some(self.pk_serde.index(0)),
1166 Some(clean_watermark_index_in_pk) => {
1167 Some(self.pk_serde.index(clean_watermark_index_in_pk as usize))
1168 }
1169 }
1170 };
1171
1172 let watermark_type = match self.clean_watermark_index_in_pk {
1173 None => WatermarkSerdeType::PkPrefix,
1174 Some(clean_watermark_index_in_pk) => match clean_watermark_index_in_pk {
1175 0 => WatermarkSerdeType::PkPrefix,
1176 _ => WatermarkSerdeType::NonPkPrefix,
1177 },
1178 };
1179
1180 let should_clean_watermark = match watermark {
1181 Some(ref watermark) => {
1182 if USE_WATERMARK_CACHE && self.watermark_cache.is_synced() {
1183 if let Some(key) = self.watermark_cache.lowest_key() {
1184 watermark.as_scalar_ref_impl().default_cmp(&key).is_ge()
1185 } else {
1186 false
1191 }
1192 } else {
1193 true
1197 }
1198 }
1199 None => false,
1200 };
1201
1202 let watermark_suffix = watermark.as_ref().map(|watermark| {
1203 serialize_pk(
1204 row::once(Some(watermark.clone())),
1205 watermark_serializer.as_ref().unwrap(),
1206 )
1207 });
1208
1209 let mut seal_watermark: Option<(WatermarkDirection, VnodeWatermark, WatermarkSerdeType)> =
1210 None;
1211
1212 if should_clean_watermark && let Some(watermark_suffix) = watermark_suffix {
1214 trace!(table_id = %self.table_id, watermark = ?watermark_suffix, vnodes = ?{
1215 self.vnodes().iter_vnodes().collect_vec()
1216 }, "delete range");
1217
1218 let order_type = watermark_serializer
1219 .as_ref()
1220 .unwrap()
1221 .get_order_types()
1222 .get(0)
1223 .unwrap();
1224
1225 if order_type.is_ascending() {
1226 seal_watermark = Some((
1227 WatermarkDirection::Ascending,
1228 VnodeWatermark::new(
1229 self.vnodes().clone(),
1230 Bytes::copy_from_slice(watermark_suffix.as_ref()),
1231 ),
1232 watermark_type,
1233 ));
1234 } else {
1235 seal_watermark = Some((
1236 WatermarkDirection::Descending,
1237 VnodeWatermark::new(
1238 self.vnodes().clone(),
1239 Bytes::copy_from_slice(watermark_suffix.as_ref()),
1240 ),
1241 watermark_type,
1242 ));
1243 }
1244 }
1245 self.committed_watermark = watermark;
1246
1247 if USE_WATERMARK_CACHE && seal_watermark.is_some() {
1255 self.watermark_cache.clear();
1256 }
1257
1258 seal_watermark.map(|(direction, watermark, is_non_pk_prefix)| {
1259 (direction, vec![watermark], is_non_pk_prefix)
1260 })
1261 }
1262
1263 pub async fn try_flush(&mut self) -> StreamExecutorResult<()> {
1264 self.local_store.try_flush().await?;
1265 Ok(())
1266 }
1267}
1268
1269pub trait RowStream<'a> = Stream<Item = StreamExecutorResult<OwnedRow>> + 'a;
1270pub trait KeyedRowStream<'a> = Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a;
1271pub trait PkRowStream<'a, K> = Stream<Item = StreamExecutorResult<(K, OwnedRow)>> + 'a;
1272
1273impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1275 StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1276where
1277 S: StateStore,
1278 SD: ValueRowSerde,
1279{
1280 pub async fn iter_with_vnode(
1283 &self,
1284
1285 vnode: VirtualNode,
1289 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1290 prefetch_options: PrefetchOptions,
1291 ) -> StreamExecutorResult<impl RowStream<'_>> {
1292 Ok(deserialize_keyed_row_stream::<'_, ()>(
1293 self.iter_kv_with_pk_range(pk_range, vnode, prefetch_options)
1294 .await?,
1295 &self.row_serde,
1296 )
1297 .map_ok(|(_, row)| row))
1298 }
1299
1300 pub async fn iter_keyed_row_with_vnode(
1301 &self,
1302 vnode: VirtualNode,
1303 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1304 prefetch_options: PrefetchOptions,
1305 ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1306 Ok(deserialize_keyed_row_stream(
1307 self.iter_kv_with_pk_range(pk_range, vnode, prefetch_options)
1308 .await?,
1309 &self.row_serde,
1310 )
1311 .map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)))
1312 }
1313
1314 pub async fn iter_with_vnode_and_output_indices(
1315 &self,
1316 vnode: VirtualNode,
1317 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1318 prefetch_options: PrefetchOptions,
1319 ) -> StreamExecutorResult<impl RowStream<'_>> {
1320 assert!(IS_REPLICATED);
1321 let stream = self
1322 .iter_with_vnode(vnode, pk_range, prefetch_options)
1323 .await?;
1324 Ok(stream.map(|row| row.map(|row| row.project(&self.output_indices).into_owned_row())))
1325 }
1326
1327 async fn iter_kv(
1328 &self,
1329 table_key_range: TableKeyRange,
1330 prefix_hint: Option<Bytes>,
1331 prefetch_options: PrefetchOptions,
1332 ) -> StreamExecutorResult<<S::Local as LocalStateStore>::Iter<'_>> {
1333 let read_options = ReadOptions {
1334 prefix_hint,
1335 retention_seconds: self.table_option.retention_seconds,
1336 table_id: self.table_id,
1337 prefetch_options,
1338 cache_policy: CachePolicy::Fill(CacheHint::Normal),
1339 ..Default::default()
1340 };
1341
1342 Ok(self.local_store.iter(table_key_range, read_options).await?)
1343 }
1344
1345 async fn rev_iter_kv(
1346 &self,
1347 table_key_range: TableKeyRange,
1348 prefix_hint: Option<Bytes>,
1349 prefetch_options: PrefetchOptions,
1350 ) -> StreamExecutorResult<<S::Local as LocalStateStore>::RevIter<'_>> {
1351 let read_options = ReadOptions {
1352 prefix_hint,
1353 retention_seconds: self.table_option.retention_seconds,
1354 table_id: self.table_id,
1355 prefetch_options,
1356 cache_policy: CachePolicy::Fill(CacheHint::Normal),
1357 ..Default::default()
1358 };
1359
1360 Ok(self
1361 .local_store
1362 .rev_iter(table_key_range, read_options)
1363 .await?)
1364 }
1365
1366 pub async fn iter_with_prefix(
1370 &self,
1371 pk_prefix: impl Row,
1372 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1373 prefetch_options: PrefetchOptions,
1374 ) -> StreamExecutorResult<impl RowStream<'_>> {
1375 let stream = self.iter_with_prefix_inner::<false, ()>(pk_prefix, sub_range, prefetch_options)
1376 .await?;
1377 Ok(stream.map_ok(|(_, row)| row))
1378 }
1379
1380 pub async fn get_from_one_row_table(&self) -> StreamExecutorResult<Option<OwnedRow>> {
1382 let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
1383 let stream = self
1384 .iter_with_prefix(row::empty(), sub_range, Default::default())
1385 .await?;
1386 pin_mut!(stream);
1387
1388 if let Some(res) = stream.next().await {
1389 let value = res?.into_owned_row();
1390 assert!(stream.next().await.is_none());
1391 Ok(Some(value))
1392 } else {
1393 Ok(None)
1394 }
1395 }
1396
1397 pub async fn get_from_one_value_table(&self) -> StreamExecutorResult<Option<ScalarImpl>> {
1402 Ok(self
1403 .get_from_one_row_table()
1404 .await?
1405 .and_then(|row| row[0].clone()))
1406 }
1407
1408 pub async fn iter_keyed_row_with_prefix(
1409 &self,
1410 pk_prefix: impl Row,
1411 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1412 prefetch_options: PrefetchOptions,
1413 ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1414 Ok(
1415 self.iter_with_prefix_inner::<false, Bytes>(pk_prefix, sub_range, prefetch_options)
1416 .await?.map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)),
1417 )
1418 }
1419
1420 pub async fn rev_iter_with_prefix(
1422 &self,
1423 pk_prefix: impl Row,
1424 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1425 prefetch_options: PrefetchOptions,
1426 ) -> StreamExecutorResult<impl RowStream<'_>> {
1427 Ok(
1428 self.iter_with_prefix_inner::<true, ()>(pk_prefix, sub_range, prefetch_options)
1429 .await?.map_ok(|(_, row)| row),
1430 )
1431 }
1432
1433 async fn iter_with_prefix_inner<const REVERSE: bool, K: CopyFromSlice>(
1434 &self,
1435 pk_prefix: impl Row,
1436 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1437 prefetch_options: PrefetchOptions,
1438 ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1439 let prefix_serializer = self.pk_serde.prefix(pk_prefix.len());
1440 let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
1441
1442 let vnode = self.compute_prefix_vnode(&pk_prefix);
1446
1447 let pk_prefix_indices = &self.pk_indices[..pk_prefix.len()];
1449 if self.prefix_hint_len != 0 {
1450 debug_assert_eq!(self.prefix_hint_len, pk_prefix.len());
1451 }
1452 let prefix_hint = {
1453 if self.prefix_hint_len == 0 || self.prefix_hint_len > pk_prefix.len() {
1454 None
1455 } else {
1456 let encoded_prefix_len = self
1457 .pk_serde
1458 .deserialize_prefix_len(&encoded_prefix, self.prefix_hint_len)?;
1459
1460 Some(Bytes::copy_from_slice(
1461 &encoded_prefix[..encoded_prefix_len],
1462 ))
1463 }
1464 };
1465
1466 trace!(
1467 table_id = %self.table_id(),
1468 ?prefix_hint, ?pk_prefix,
1469 ?pk_prefix_indices,
1470 iter_direction = if REVERSE { "reverse" } else { "forward" },
1471 "storage_iter_with_prefix"
1472 );
1473
1474 let memcomparable_range =
1475 prefix_and_sub_range_to_memcomparable(&self.pk_serde, sub_range, pk_prefix);
1476
1477 let memcomparable_range_with_vnode = prefixed_range_with_vnode(memcomparable_range, vnode);
1478
1479 Ok(if REVERSE {
1480 futures::future::Either::Left(deserialize_keyed_row_stream(
1481 self.rev_iter_kv(
1482 memcomparable_range_with_vnode,
1483 prefix_hint,
1484 prefetch_options,
1485 )
1486 .await?,
1487 &self.row_serde,
1488 ))
1489 } else {
1490 futures::future::Either::Right(deserialize_keyed_row_stream(
1491 self.iter_kv(
1492 memcomparable_range_with_vnode,
1493 prefix_hint,
1494 prefetch_options,
1495 )
1496 .await?,
1497 &self.row_serde,
1498 ))
1499 })
1500 }
1501
1502 async fn iter_kv_with_pk_range(
1505 &self,
1506 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1507 vnode: VirtualNode,
1511 prefetch_options: PrefetchOptions,
1512 ) -> StreamExecutorResult<<S::Local as LocalStateStore>::Iter<'_>> {
1513 let memcomparable_range = prefix_range_to_memcomparable(&self.pk_serde, pk_range);
1514 let memcomparable_range_with_vnode = prefixed_range_with_vnode(memcomparable_range, vnode);
1515
1516 self.iter_kv(memcomparable_range_with_vnode, None, prefetch_options)
1518 .await
1519 }
1520
1521 #[cfg(test)]
1522 pub fn get_watermark_cache(&self) -> &StateTableWatermarkCache {
1523 &self.watermark_cache
1524 }
1525}
1526
1527impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1528 StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1529where
1530 S: StateStore,
1531 SD: ValueRowSerde,
1532{
1533 pub async fn iter_log_with_vnode(
1534 &self,
1535 vnode: VirtualNode,
1536 epoch_range: (u64, u64),
1537 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1538 ) -> StreamExecutorResult<impl Stream<Item = StreamExecutorResult<ChangeLogRow>> + '_> {
1539 let memcomparable_range = prefix_range_to_memcomparable(&self.pk_serde, pk_range);
1540 let memcomparable_range_with_vnode = prefixed_range_with_vnode(memcomparable_range, vnode);
1541 Ok(deserialize_log_stream(
1542 self.store
1543 .iter_log(
1544 epoch_range,
1545 memcomparable_range_with_vnode,
1546 ReadLogOptions {
1547 table_id: self.table_id,
1548 },
1549 )
1550 .await?,
1551 &self.row_serde,
1552 )
1553 .map_err(Into::into))
1554 }
1555}
1556
1557fn deserialize_keyed_row_stream<'a, K: CopyFromSlice>(
1558 iter: impl StateStoreIter + 'a,
1559 deserializer: &'a impl ValueRowSerde,
1560) -> impl PkRowStream<'a, K> {
1561 iter.into_stream(move |(key, value)| {
1562 Ok((
1563 K::copy_from_slice(key.user_key.table_key.as_ref()),
1564 deserializer.deserialize(value).map(OwnedRow::new)?,
1565 ))
1566 })
1567 .map_err(Into::into)
1568}
1569
1570pub fn prefix_range_to_memcomparable(
1571 pk_serde: &OrderedRowSerde,
1572 range: &(Bound<impl Row>, Bound<impl Row>),
1573) -> (Bound<Bytes>, Bound<Bytes>) {
1574 (
1575 start_range_to_memcomparable(pk_serde, &range.0),
1576 end_range_to_memcomparable(pk_serde, &range.1, None),
1577 )
1578}
1579
1580fn prefix_and_sub_range_to_memcomparable(
1581 pk_serde: &OrderedRowSerde,
1582 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1583 pk_prefix: impl Row,
1584) -> (Bound<Bytes>, Bound<Bytes>) {
1585 let (range_start, range_end) = sub_range;
1586 let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1587 let serialized_pk_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
1588 let start_range = match range_start {
1589 Included(start_range) => Bound::Included(Either::Left((&pk_prefix).chain(start_range))),
1590 Excluded(start_range) => Bound::Excluded(Either::Left((&pk_prefix).chain(start_range))),
1591 Unbounded => Bound::Included(Either::Right(&pk_prefix)),
1592 };
1593 let end_range = match range_end {
1594 Included(end_range) => Bound::Included((&pk_prefix).chain(end_range)),
1595 Excluded(end_range) => Bound::Excluded((&pk_prefix).chain(end_range)),
1596 Unbounded => Unbounded,
1597 };
1598 (
1599 start_range_to_memcomparable(pk_serde, &start_range),
1600 end_range_to_memcomparable(pk_serde, &end_range, Some(serialized_pk_prefix)),
1601 )
1602}
1603
1604fn start_range_to_memcomparable<R: Row>(
1605 pk_serde: &OrderedRowSerde,
1606 bound: &Bound<R>,
1607) -> Bound<Bytes> {
1608 let serialize_pk_prefix = |pk_prefix: &R| {
1609 let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1610 serialize_pk(pk_prefix, &prefix_serializer)
1611 };
1612 match bound {
1613 Unbounded => Unbounded,
1614 Included(r) => {
1615 let serialized = serialize_pk_prefix(r);
1616
1617 Included(serialized)
1618 }
1619 Excluded(r) => {
1620 let serialized = serialize_pk_prefix(r);
1621
1622 start_bound_of_excluded_prefix(&serialized)
1623 }
1624 }
1625}
1626
1627fn end_range_to_memcomparable<R: Row>(
1628 pk_serde: &OrderedRowSerde,
1629 bound: &Bound<R>,
1630 serialized_pk_prefix: Option<Bytes>,
1631) -> Bound<Bytes> {
1632 let serialize_pk_prefix = |pk_prefix: &R| {
1633 let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1634 serialize_pk(pk_prefix, &prefix_serializer)
1635 };
1636 match bound {
1637 Unbounded => match serialized_pk_prefix {
1638 Some(serialized_pk_prefix) => end_bound_of_prefix(&serialized_pk_prefix),
1639 None => Unbounded,
1640 },
1641 Included(r) => {
1642 let serialized = serialize_pk_prefix(r);
1643
1644 end_bound_of_prefix(&serialized)
1645 }
1646 Excluded(r) => {
1647 let serialized = serialize_pk_prefix(r);
1648 Excluded(serialized)
1649 }
1650 }
1651}
1652
1653fn fill_non_output_indices(
1654 i2o_mapping: &ColIndexMapping,
1655 data_types: &[DataType],
1656 chunk: StreamChunk,
1657) -> StreamChunk {
1658 let cardinality = chunk.cardinality();
1659 let (ops, columns, vis) = chunk.into_inner();
1660 let mut full_columns = Vec::with_capacity(data_types.len());
1661 for (i, data_type) in data_types.iter().enumerate() {
1662 if let Some(j) = i2o_mapping.try_map(i) {
1663 full_columns.push(columns[j].clone());
1664 } else {
1665 let mut column_builder = ArrayImplBuilder::with_type(cardinality, data_type.clone());
1666 column_builder.append_n_null(cardinality);
1667 let column: ArrayRef = column_builder.finish().into();
1668 full_columns.push(column)
1669 }
1670 }
1671 let data_chunk = DataChunk::new(full_columns, vis);
1672 StreamChunk::from_parts(ops, data_chunk)
1673}
1674
1675#[cfg(test)]
1676mod tests {
1677 use std::fmt::Debug;
1678
1679 use expect_test::{Expect, expect};
1680
1681 use super::*;
1682
1683 fn check(actual: impl Debug, expect: Expect) {
1684 let actual = format!("{:#?}", actual);
1685 expect.assert_eq(&actual);
1686 }
1687
1688 #[test]
1689 fn test_fill_non_output_indices() {
1690 let data_types = vec![DataType::Int32, DataType::Int32, DataType::Int32];
1691 let replicated_chunk = [OwnedRow::new(vec![
1692 Some(222_i32.into()),
1693 Some(2_i32.into()),
1694 ])];
1695 let replicated_chunk = StreamChunk::from_parts(
1696 vec![Op::Insert],
1697 DataChunk::from_rows(&replicated_chunk, &[DataType::Int32, DataType::Int32]),
1698 );
1699 let i2o_mapping = ColIndexMapping::new(vec![Some(1), None, Some(0)], 2);
1700 let filled_chunk = fill_non_output_indices(&i2o_mapping, &data_types, replicated_chunk);
1701 check(
1702 filled_chunk,
1703 expect![[r#"
1704 StreamChunk { cardinality: 1, capacity: 1, data:
1705 +---+---+---+-----+
1706 | + | 2 | | 222 |
1707 +---+---+---+-----+
1708 }"#]],
1709 );
1710 }
1711}