1use std::collections::HashMap;
16use std::ops::Bound;
17use std::ops::Bound::*;
18use std::sync::Arc;
19
20use bytes::{BufMut, Bytes, BytesMut};
21use either::Either;
22use foyer::Hint;
23use futures::{Stream, StreamExt, TryStreamExt, pin_mut};
24use futures_async_stream::for_await;
25use itertools::{Itertools, izip};
26use risingwave_common::array::stream_record::Record;
27use risingwave_common::array::{ArrayImplBuilder, ArrayRef, DataChunk, Op, StreamChunk};
28use risingwave_common::bitmap::Bitmap;
29use risingwave_common::catalog::{
30 ColumnDesc, ColumnId, TableId, TableOption, get_dist_key_in_pk_indices,
31};
32use risingwave_common::hash::{VirtualNode, VnodeBitmapExt, VnodeCountCompat};
33use risingwave_common::row::{self, Once, OwnedRow, Row, RowExt, once};
34use risingwave_common::types::{DataType, Datum, DefaultOrd, DefaultOrdered, ScalarImpl};
35use risingwave_common::util::column_index_mapping::ColIndexMapping;
36use risingwave_common::util::epoch::EpochPair;
37use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
38use risingwave_common::util::row_serde::OrderedRowSerde;
39use risingwave_common::util::sort_util::OrderType;
40use risingwave_common::util::value_encoding::BasicSerde;
41use risingwave_hummock_sdk::HummockReadEpoch;
42use risingwave_hummock_sdk::key::{
43 CopyFromSlice, TableKey, TableKeyRange, end_bound_of_prefix, prefixed_range_with_vnode,
44 start_bound_of_excluded_prefix,
45};
46use risingwave_hummock_sdk::table_watermark::{
47 VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
48};
49use risingwave_pb::catalog::Table;
50use risingwave_storage::StateStore;
51use risingwave_storage::error::{ErrorKind, StorageError, StorageResult};
52use risingwave_storage::hummock::CachePolicy;
53use risingwave_storage::mem_table::MemTableError;
54use risingwave_storage::row_serde::find_columns_by_ids;
55use risingwave_storage::row_serde::row_serde_util::{
56 deserialize_pk_with_vnode, serialize_pk, serialize_pk_with_vnode,
57};
58use risingwave_storage::row_serde::value_serde::ValueRowSerde;
59use risingwave_storage::store::*;
60use risingwave_storage::table::merge_sort::merge_sort;
61use risingwave_storage::table::{
62 ChangeLogRow, KeyedRow, TableDistribution, deserialize_log_stream,
63};
64use thiserror_ext::AsReport;
65use tracing::{Instrument, trace};
66
67use crate::cache::cache_may_stale;
68use crate::common::state_cache::{StateCache, StateCacheFiller};
69use crate::common::table::state_table_cache::StateTableWatermarkCache;
70use crate::executor::StreamExecutorResult;
71
72const WATERMARK_CACHE_ENTRIES: usize = 16;
76
77macro_rules! insane_mode_discard_point {
80 () => {{
81 use rand::Rng;
82 if crate::consistency::insane() && rand::rng().random_bool(0.3) {
83 return;
84 }
85 }};
86}
87
88#[derive(Clone)]
91pub struct StateTableInner<
92 S,
93 SD = BasicSerde,
94 const IS_REPLICATED: bool = false,
95 const USE_WATERMARK_CACHE: bool = false,
96> where
97 S: StateStore,
98 SD: ValueRowSerde,
99{
100 table_id: TableId,
102
103 local_store: S::Local,
105
106 store: S,
108
109 epoch: Option<EpochPair>,
111
112 pk_serde: OrderedRowSerde,
114
115 row_serde: Arc<SD>,
117
118 pk_indices: Vec<usize>,
122
123 distribution: TableDistribution,
129
130 prefix_hint_len: usize,
131
132 table_option: TableOption,
134
135 value_indices: Option<Vec<usize>>,
136
137 pending_watermark: Option<ScalarImpl>,
139 committed_watermark: Option<ScalarImpl>,
141 watermark_cache: StateTableWatermarkCache,
143
144 data_types: Vec<DataType>,
147
148 i2o_mapping: ColIndexMapping,
154
155 output_indices: Vec<usize>,
160
161 op_consistency_level: StateTableOpConsistencyLevel,
162
163 clean_watermark_index_in_pk: Option<i32>,
164
165 on_post_commit: bool,
168}
169
170pub type StateTable<S> = StateTableInner<S, BasicSerde>;
172pub type ReplicatedStateTable<S, SD> = StateTableInner<S, SD, true>;
175pub type WatermarkCacheStateTable<S> = StateTableInner<S, BasicSerde, false, true>;
178pub type WatermarkCacheParameterizedStateTable<S, const USE_WATERMARK_CACHE: bool> =
179 StateTableInner<S, BasicSerde, false, USE_WATERMARK_CACHE>;
180
181impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
183 StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
184where
185 S: StateStore,
186 SD: ValueRowSerde,
187{
188 pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
191 self.local_store.init(InitOptions::new(epoch)).await?;
192 assert_eq!(None, self.epoch.replace(epoch), "should not init for twice");
193 Ok(())
194 }
195
196 pub async fn try_wait_committed_epoch(&self, prev_epoch: u64) -> StorageResult<()> {
197 self.store
198 .try_wait_epoch(
199 HummockReadEpoch::Committed(prev_epoch),
200 TryWaitEpochOptions {
201 table_id: self.table_id,
202 },
203 )
204 .await
205 }
206
207 pub fn state_store(&self) -> &S {
208 &self.store
209 }
210}
211
212fn consistent_old_value_op(
213 row_serde: Arc<impl ValueRowSerde>,
214 is_log_store: bool,
215) -> OpConsistencyLevel {
216 OpConsistencyLevel::ConsistentOldValue {
217 check_old_value: Arc::new(move |first: &Bytes, second: &Bytes| {
218 if first == second {
219 return true;
220 }
221 let first = match row_serde.deserialize(first) {
222 Ok(rows) => rows,
223 Err(e) => {
224 error!(error = %e.as_report(), value = ?first, "fail to deserialize serialized value");
225 return false;
226 }
227 };
228 let second = match row_serde.deserialize(second) {
229 Ok(rows) => rows,
230 Err(e) => {
231 error!(error = %e.as_report(), value = ?second, "fail to deserialize serialized value");
232 return false;
233 }
234 };
235 if first != second {
236 error!(first = ?first, second = ?second, "sanity check fail");
237 false
238 } else {
239 true
240 }
241 }),
242 is_log_store,
243 }
244}
245
246#[derive(Eq, PartialEq, Copy, Clone, Debug)]
247pub enum StateTableOpConsistencyLevel {
248 Inconsistent,
250 ConsistentOldValue,
254 LogStoreEnabled,
257}
258
259impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
264 StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
265where
266 S: StateStore,
267 SD: ValueRowSerde,
268{
269 pub async fn from_table_catalog(
273 table_catalog: &Table,
274 store: S,
275 vnodes: Option<Arc<Bitmap>>,
276 ) -> Self {
277 Self::from_table_catalog_with_consistency_level(
278 table_catalog,
279 store,
280 vnodes,
281 StateTableOpConsistencyLevel::ConsistentOldValue,
282 )
283 .await
284 }
285
286 pub async fn from_table_catalog_inconsistent_op(
288 table_catalog: &Table,
289 store: S,
290 vnodes: Option<Arc<Bitmap>>,
291 ) -> Self {
292 Self::from_table_catalog_with_consistency_level(
293 table_catalog,
294 store,
295 vnodes,
296 StateTableOpConsistencyLevel::Inconsistent,
297 )
298 .await
299 }
300
301 pub async fn from_table_catalog_with_consistency_level(
302 table_catalog: &Table,
303 store: S,
304 vnodes: Option<Arc<Bitmap>>,
305 consistency_level: StateTableOpConsistencyLevel,
306 ) -> Self {
307 Self::from_table_catalog_inner(table_catalog, store, vnodes, consistency_level, vec![])
308 .await
309 }
310
311 async fn from_table_catalog_inner(
313 table_catalog: &Table,
314 store: S,
315 vnodes: Option<Arc<Bitmap>>,
316 op_consistency_level: StateTableOpConsistencyLevel,
317 output_column_ids: Vec<ColumnId>,
318 ) -> Self {
319 let table_id = TableId::new(table_catalog.id);
320 let table_columns: Vec<ColumnDesc> = table_catalog
321 .columns
322 .iter()
323 .map(|col| col.column_desc.as_ref().unwrap().into())
324 .collect();
325 let data_types: Vec<DataType> = table_catalog
326 .columns
327 .iter()
328 .map(|col| {
329 col.get_column_desc()
330 .unwrap()
331 .get_column_type()
332 .unwrap()
333 .into()
334 })
335 .collect();
336 let order_types: Vec<OrderType> = table_catalog
337 .pk
338 .iter()
339 .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
340 .collect();
341 let dist_key_indices: Vec<usize> = table_catalog
342 .distribution_key
343 .iter()
344 .map(|dist_index| *dist_index as usize)
345 .collect();
346
347 let pk_indices = table_catalog
348 .pk
349 .iter()
350 .map(|col_order| col_order.column_index as usize)
351 .collect_vec();
352
353 let dist_key_in_pk_indices = if table_catalog.get_dist_key_in_pk().is_empty() {
355 get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices).unwrap()
356 } else {
357 table_catalog
358 .get_dist_key_in_pk()
359 .iter()
360 .map(|idx| *idx as usize)
361 .collect()
362 };
363
364 let vnode_col_idx_in_pk = table_catalog.vnode_col_index.as_ref().and_then(|idx| {
365 let vnode_col_idx = *idx as usize;
366 pk_indices.iter().position(|&i| vnode_col_idx == i)
367 });
368
369 let distribution =
370 TableDistribution::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk);
371 assert_eq!(
372 distribution.vnode_count(),
373 table_catalog.vnode_count(),
374 "vnode count mismatch, scanning table {} under wrong distribution?",
375 table_catalog.name,
376 );
377
378 let pk_data_types = pk_indices
379 .iter()
380 .map(|i| table_columns[*i].data_type.clone())
381 .collect();
382 let pk_serde = OrderedRowSerde::new(pk_data_types, order_types);
383
384 let input_value_indices = table_catalog
385 .value_indices
386 .iter()
387 .map(|val| *val as usize)
388 .collect_vec();
389
390 let no_shuffle_value_indices = (0..table_columns.len()).collect_vec();
391
392 let value_indices = match input_value_indices.len() == table_columns.len()
394 && input_value_indices == no_shuffle_value_indices
395 {
396 true => None,
397 false => Some(input_value_indices),
398 };
399 let prefix_hint_len = table_catalog.read_prefix_len_hint as usize;
400
401 let row_serde = Arc::new(SD::new(
402 Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
403 Arc::from(table_columns.clone().into_boxed_slice()),
404 ));
405
406 let state_table_op_consistency_level = op_consistency_level;
407 let op_consistency_level = match op_consistency_level {
408 StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
409 StateTableOpConsistencyLevel::ConsistentOldValue => {
410 consistent_old_value_op(row_serde.clone(), false)
411 }
412 StateTableOpConsistencyLevel::LogStoreEnabled => {
413 consistent_old_value_op(row_serde.clone(), true)
414 }
415 };
416
417 let table_option = TableOption::new(table_catalog.retention_seconds);
418 let new_local_options = if IS_REPLICATED {
419 NewLocalOptions::new_replicated(
420 table_id,
421 op_consistency_level,
422 table_option,
423 distribution.vnodes().clone(),
424 )
425 } else {
426 NewLocalOptions::new(
427 table_id,
428 op_consistency_level,
429 table_option,
430 distribution.vnodes().clone(),
431 true,
432 )
433 };
434 let local_state_store = store.new_local(new_local_options).await;
435
436 assert_eq!(
442 table_catalog.version.is_some(),
443 row_serde.kind().is_column_aware()
444 );
445
446 let watermark_serde = if pk_indices.is_empty() {
448 None
449 } else {
450 match table_catalog.clean_watermark_index_in_pk {
451 None => Some(pk_serde.index(0)),
452 Some(clean_watermark_index_in_pk) => {
453 Some(pk_serde.index(clean_watermark_index_in_pk as usize))
454 }
455 }
456 };
457 let max_watermark_of_vnodes = distribution
458 .vnodes()
459 .iter_vnodes()
460 .filter_map(|vnode| local_state_store.get_table_watermark(vnode))
461 .max();
462 let committed_watermark = if let Some(deser) = watermark_serde
463 && let Some(max_watermark) = max_watermark_of_vnodes
464 {
465 let deserialized = deser.deserialize(&max_watermark).ok().and_then(|row| {
466 assert!(row.len() == 1);
467 row[0].clone()
468 });
469 if deserialized.is_none() {
470 tracing::error!(
471 vnodes = ?distribution.vnodes(),
472 watermark = ?max_watermark,
473 "Failed to deserialize persisted watermark from state store.",
474 );
475 }
476 deserialized
477 } else {
478 None
479 };
480
481 let watermark_cache = if USE_WATERMARK_CACHE {
482 StateTableWatermarkCache::new(WATERMARK_CACHE_ENTRIES)
483 } else {
484 StateTableWatermarkCache::new(0)
485 };
486
487 let output_column_ids_to_input_idx = output_column_ids
489 .iter()
490 .enumerate()
491 .map(|(pos, id)| (*id, pos))
492 .collect::<HashMap<_, _>>();
493
494 let columns: Vec<ColumnDesc> = table_catalog
496 .columns
497 .iter()
498 .map(|c| c.column_desc.as_ref().unwrap().into())
499 .collect_vec();
500
501 let mut i2o_mapping = vec![None; columns.len()];
505 for (i, column) in columns.iter().enumerate() {
506 if let Some(pos) = output_column_ids_to_input_idx.get(&column.column_id) {
507 i2o_mapping[i] = Some(*pos);
508 }
509 }
510 let i2o_mapping = ColIndexMapping::new(i2o_mapping, output_column_ids.len());
512
513 let (_, output_indices) = find_columns_by_ids(&columns[..], &output_column_ids);
515
516 Self {
517 table_id,
518 local_store: local_state_store,
519 store,
520 epoch: None,
521 pk_serde,
522 row_serde,
523 pk_indices,
524 distribution,
525 prefix_hint_len,
526 table_option,
527 value_indices,
528 pending_watermark: None,
529 committed_watermark,
530 watermark_cache,
531 data_types,
532 output_indices,
533 i2o_mapping,
534 op_consistency_level: state_table_op_consistency_level,
535 clean_watermark_index_in_pk: table_catalog.clean_watermark_index_in_pk,
536 on_post_commit: false,
537 }
538 }
539
540 pub fn get_data_types(&self) -> &[DataType] {
541 &self.data_types
542 }
543
544 pub fn table_id(&self) -> u32 {
545 self.table_id.table_id
546 }
547
548 fn compute_prefix_vnode(&self, pk_prefix: impl Row) -> VirtualNode {
550 self.distribution
551 .try_compute_vnode_by_pk_prefix(pk_prefix)
552 .expect("For streaming, the given prefix must be enough to calculate the vnode")
553 }
554
555 pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode {
557 self.distribution.compute_vnode_by_pk(pk)
558 }
559
560 pub fn pk_indices(&self) -> &[usize] {
563 &self.pk_indices
564 }
565
566 pub fn pk_in_output_indices(&self) -> Option<Vec<usize>> {
570 assert!(IS_REPLICATED);
571 self.pk_indices
572 .iter()
573 .map(|&i| self.output_indices.iter().position(|&j| i == j))
574 .collect()
575 }
576
577 pub fn pk_serde(&self) -> &OrderedRowSerde {
578 &self.pk_serde
579 }
580
581 pub fn vnodes(&self) -> &Arc<Bitmap> {
582 self.distribution.vnodes()
583 }
584
585 pub fn value_indices(&self) -> &Option<Vec<usize>> {
586 &self.value_indices
587 }
588
589 pub fn is_consistent_op(&self) -> bool {
590 matches!(
591 self.op_consistency_level,
592 StateTableOpConsistencyLevel::ConsistentOldValue
593 | StateTableOpConsistencyLevel::LogStoreEnabled
594 )
595 }
596}
597
598impl<S, SD, const USE_WATERMARK_CACHE: bool> StateTableInner<S, SD, true, USE_WATERMARK_CACHE>
599where
600 S: StateStore,
601 SD: ValueRowSerde,
602{
603 pub async fn from_table_catalog_with_output_column_ids(
605 table_catalog: &Table,
606 store: S,
607 vnodes: Option<Arc<Bitmap>>,
608 output_column_ids: Vec<ColumnId>,
609 ) -> Self {
610 Self::from_table_catalog_inner(
611 table_catalog,
612 store,
613 vnodes,
614 StateTableOpConsistencyLevel::Inconsistent,
615 output_column_ids,
616 )
617 .await
618 }
619}
620
621impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
623 StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
624where
625 S: StateStore,
626 SD: ValueRowSerde,
627{
628 pub async fn get_row(&self, pk: impl Row) -> StreamExecutorResult<Option<OwnedRow>> {
630 let row_serde = self.row_serde.clone();
632 let row = self
633 .get_inner(pk, move |_, value| Ok(row_serde.deserialize(value)?))
634 .await?;
635 match row {
636 Some(row) => {
637 if IS_REPLICATED {
638 let row = row.project(&self.output_indices);
641 Ok(Some(row.into_owned_row()))
642 } else {
643 Ok(Some(OwnedRow::new(row)))
644 }
645 }
646 None => Ok(None),
647 }
648 }
649
650 pub async fn get_encoded_row(&self, pk: impl Row) -> StreamExecutorResult<Option<Bytes>> {
652 self.get_inner(pk, |_, value| Ok(Bytes::copy_from_slice(value)))
653 .await
654 }
655
656 async fn get_inner<O: Send + 'static>(
657 &self,
658 pk: impl Row,
659 on_key_value_fn: impl risingwave_storage::store::KeyValueFn<O>,
660 ) -> StreamExecutorResult<Option<O>> {
661 assert!(pk.len() <= self.pk_indices.len());
662
663 let serialized_pk =
664 serialize_pk_with_vnode(&pk, &self.pk_serde, self.compute_vnode_by_pk(&pk));
665
666 let prefix_hint = if self.prefix_hint_len != 0 && self.prefix_hint_len == pk.len() {
667 Some(serialized_pk.slice(VirtualNode::SIZE..))
668 } else {
669 #[cfg(debug_assertions)]
670 if self.prefix_hint_len != 0 {
671 warn!(
672 "prefix_hint_len is not equal to pk.len(), may not be able to utilize bloom filter"
673 );
674 }
675 None
676 };
677
678 let read_options = ReadOptions {
679 prefix_hint,
680 retention_seconds: self.table_option.retention_seconds,
681 cache_policy: CachePolicy::Fill(Hint::Normal),
682 ..Default::default()
683 };
684
685 self.local_store
686 .on_key_value(serialized_pk, read_options, on_key_value_fn)
687 .await
688 .map_err(Into::into)
689 }
690}
691
692#[must_use]
707pub struct StateTablePostCommit<
708 'a,
709 S,
710 SD = BasicSerde,
711 const IS_REPLICATED: bool = false,
712 const USE_WATERMARK_CACHE: bool = false,
713> where
714 S: StateStore,
715 SD: ValueRowSerde,
716{
717 inner: &'a mut StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>,
718}
719
720impl<'a, S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
721 StateTablePostCommit<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
722where
723 S: StateStore,
724 SD: ValueRowSerde,
725{
726 pub async fn post_yield_barrier(
727 mut self,
728 new_vnodes: Option<Arc<Bitmap>>,
729 ) -> StreamExecutorResult<
730 Option<(
731 (
732 Arc<Bitmap>,
733 Arc<Bitmap>,
734 &'a mut StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>,
735 ),
736 bool,
737 )>,
738 > {
739 self.inner.on_post_commit = false;
740 Ok(if let Some(new_vnodes) = new_vnodes {
741 let (old_vnodes, cache_may_stale) =
742 self.update_vnode_bitmap(new_vnodes.clone()).await?;
743 Some(((new_vnodes, old_vnodes, self.inner), cache_may_stale))
744 } else {
745 None
746 })
747 }
748
749 pub fn inner(&self) -> &StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE> {
750 &*self.inner
751 }
752
753 async fn update_vnode_bitmap(
755 &mut self,
756 new_vnodes: Arc<Bitmap>,
757 ) -> StreamExecutorResult<(Arc<Bitmap>, bool)> {
758 let prev_vnodes = self
759 .inner
760 .local_store
761 .update_vnode_bitmap(new_vnodes.clone())
762 .await?;
763 assert_eq!(
764 &prev_vnodes,
765 self.inner.vnodes(),
766 "state table and state store vnode bitmap mismatches"
767 );
768
769 if self.inner.distribution.is_singleton() {
770 assert_eq!(
771 &new_vnodes,
772 self.inner.vnodes(),
773 "should not update vnode bitmap for singleton table"
774 );
775 }
776 assert_eq!(self.inner.vnodes().len(), new_vnodes.len());
777
778 let cache_may_stale = cache_may_stale(self.inner.vnodes(), &new_vnodes);
779
780 if cache_may_stale {
781 self.inner.pending_watermark = None;
782 if USE_WATERMARK_CACHE {
783 self.inner.watermark_cache.clear();
784 }
785 }
786
787 Ok((
788 self.inner.distribution.update_vnode_bitmap(new_vnodes),
789 cache_may_stale,
790 ))
791 }
792}
793
794impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
796 StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
797where
798 S: StateStore,
799 SD: ValueRowSerde,
800{
801 fn handle_mem_table_error(&self, e: StorageError) {
802 let e = match e.into_inner() {
803 ErrorKind::MemTable(e) => e,
804 _ => unreachable!("should only get memtable error"),
805 };
806 match *e {
807 MemTableError::InconsistentOperation { key, prev, new } => {
808 let (vnode, key) = deserialize_pk_with_vnode(&key, &self.pk_serde).unwrap();
809 panic!(
810 "mem-table operation inconsistent! table_id: {}, vnode: {}, key: {:?}, prev: {}, new: {}",
811 self.table_id(),
812 vnode,
813 &key,
814 prev.debug_fmt(&*self.row_serde),
815 new.debug_fmt(&*self.row_serde),
816 )
817 }
818 }
819 }
820
821 fn serialize_value(&self, value: impl Row) -> Bytes {
822 if let Some(value_indices) = self.value_indices.as_ref() {
823 self.row_serde
824 .serialize(value.project(value_indices))
825 .into()
826 } else {
827 self.row_serde.serialize(value).into()
828 }
829 }
830
831 fn insert_inner(&mut self, key: TableKey<Bytes>, value_bytes: Bytes) {
832 insane_mode_discard_point!();
833 self.local_store
834 .insert(key, value_bytes, None)
835 .unwrap_or_else(|e| self.handle_mem_table_error(e));
836 }
837
838 fn delete_inner(&mut self, key: TableKey<Bytes>, value_bytes: Bytes) {
839 insane_mode_discard_point!();
840 self.local_store
841 .delete(key, value_bytes)
842 .unwrap_or_else(|e| self.handle_mem_table_error(e));
843 }
844
845 fn update_inner(
846 &mut self,
847 key_bytes: TableKey<Bytes>,
848 old_value_bytes: Option<Bytes>,
849 new_value_bytes: Bytes,
850 ) {
851 insane_mode_discard_point!();
852 self.local_store
853 .insert(key_bytes, new_value_bytes, old_value_bytes)
854 .unwrap_or_else(|e| self.handle_mem_table_error(e));
855 }
856
857 pub fn insert(&mut self, value: impl Row) {
860 let pk_indices = &self.pk_indices;
861 let pk = (&value).project(pk_indices);
862 if USE_WATERMARK_CACHE {
863 self.watermark_cache.insert(&pk);
864 }
865
866 let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_vnode_by_pk(pk));
867 let value_bytes = self.serialize_value(value);
868 self.insert_inner(key_bytes, value_bytes);
869 }
870
871 pub fn delete(&mut self, old_value: impl Row) {
874 let pk_indices = &self.pk_indices;
875 let pk = (&old_value).project(pk_indices);
876 if USE_WATERMARK_CACHE {
877 self.watermark_cache.delete(&pk);
878 }
879
880 let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_vnode_by_pk(pk));
881 let value_bytes = self.serialize_value(old_value);
882 self.delete_inner(key_bytes, value_bytes);
883 }
884
885 pub fn update(&mut self, old_value: impl Row, new_value: impl Row) {
887 let old_pk = (&old_value).project(self.pk_indices());
888 let new_pk = (&new_value).project(self.pk_indices());
889 debug_assert!(
890 Row::eq(&old_pk, new_pk),
891 "pk should not change: {old_pk:?} vs {new_pk:?}. {}",
892 self.table_id
893 );
894
895 let new_key_bytes =
896 serialize_pk_with_vnode(new_pk, &self.pk_serde, self.compute_vnode_by_pk(new_pk));
897 let old_value_bytes = self.serialize_value(old_value);
898 let new_value_bytes = self.serialize_value(new_value);
899
900 self.update_inner(new_key_bytes, Some(old_value_bytes), new_value_bytes);
901 }
902
903 pub fn write_record(&mut self, record: Record<impl Row>) {
905 match record {
906 Record::Insert { new_row } => self.insert(new_row),
907 Record::Delete { old_row } => self.delete(old_row),
908 Record::Update { old_row, new_row } => self.update(old_row, new_row),
909 }
910 }
911
912 fn fill_non_output_indices(&self, chunk: StreamChunk) -> StreamChunk {
913 fill_non_output_indices(&self.i2o_mapping, &self.data_types, chunk)
914 }
915
916 #[allow(clippy::disallowed_methods)]
919 pub fn write_chunk(&mut self, chunk: StreamChunk) {
920 let chunk = if IS_REPLICATED {
921 self.fill_non_output_indices(chunk)
922 } else {
923 chunk
924 };
925 let (chunk, op) = chunk.into_parts();
926
927 let vnodes = self
928 .distribution
929 .compute_chunk_vnode(&chunk, &self.pk_indices);
930
931 let values = if let Some(ref value_indices) = self.value_indices {
932 chunk
933 .project(value_indices)
934 .serialize_with(&*self.row_serde)
935 } else {
936 chunk.serialize_with(&*self.row_serde)
937 };
938
939 let key_chunk = chunk.project(self.pk_indices());
944 let vnode_and_pks = key_chunk
945 .rows_with_holes()
946 .zip_eq_fast(vnodes.iter())
947 .map(|(r, vnode)| {
948 let mut buffer = BytesMut::new();
949 buffer.put_slice(&vnode.to_be_bytes()[..]);
950 if let Some(r) = r {
951 self.pk_serde.serialize(r, &mut buffer);
952 }
953 (r, buffer.freeze())
954 })
955 .collect_vec();
956
957 if !key_chunk.is_compacted() {
958 for ((op, (key, key_bytes), value), vis) in
959 izip!(op.iter(), vnode_and_pks, values).zip_eq_debug(key_chunk.visibility().iter())
960 {
961 if vis {
962 match op {
963 Op::Insert | Op::UpdateInsert => {
964 if USE_WATERMARK_CACHE && let Some(ref pk) = key {
965 self.watermark_cache.insert(pk);
966 }
967 self.insert_inner(TableKey(key_bytes), value);
968 }
969 Op::Delete | Op::UpdateDelete => {
970 if USE_WATERMARK_CACHE && let Some(ref pk) = key {
971 self.watermark_cache.delete(pk);
972 }
973 self.delete_inner(TableKey(key_bytes), value);
974 }
975 }
976 }
977 }
978 } else {
979 for (op, (key, key_bytes), value) in izip!(op.iter(), vnode_and_pks, values) {
980 match op {
981 Op::Insert | Op::UpdateInsert => {
982 if USE_WATERMARK_CACHE && let Some(ref pk) = key {
983 self.watermark_cache.insert(pk);
984 }
985 self.insert_inner(TableKey(key_bytes), value);
986 }
987 Op::Delete | Op::UpdateDelete => {
988 if USE_WATERMARK_CACHE && let Some(ref pk) = key {
989 self.watermark_cache.delete(pk);
990 }
991 self.delete_inner(TableKey(key_bytes), value);
992 }
993 }
994 }
995 }
996 }
997
998 pub fn update_watermark(&mut self, watermark: ScalarImpl) {
1004 trace!(table_id = %self.table_id, watermark = ?watermark, "update watermark");
1005 self.pending_watermark = Some(watermark);
1006 }
1007
1008 pub fn get_committed_watermark(&self) -> Option<&ScalarImpl> {
1011 self.committed_watermark.as_ref()
1012 }
1013
1014 pub async fn commit(
1015 &mut self,
1016 new_epoch: EpochPair,
1017 ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1018 {
1019 self.commit_inner(new_epoch, None).await
1020 }
1021
1022 #[cfg(test)]
1023 pub async fn commit_for_test(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
1024 self.commit_assert_no_update_vnode_bitmap(new_epoch).await
1025 }
1026
1027 pub async fn commit_assert_no_update_vnode_bitmap(
1028 &mut self,
1029 new_epoch: EpochPair,
1030 ) -> StreamExecutorResult<()> {
1031 let post_commit = self.commit_inner(new_epoch, None).await?;
1032 post_commit.post_yield_barrier(None).await?;
1033 Ok(())
1034 }
1035
1036 pub async fn commit_may_switch_consistent_op(
1037 &mut self,
1038 new_epoch: EpochPair,
1039 op_consistency_level: StateTableOpConsistencyLevel,
1040 ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1041 {
1042 if self.op_consistency_level != op_consistency_level {
1043 info!(
1044 ?new_epoch,
1045 prev_op_consistency_level = ?self.op_consistency_level,
1046 ?op_consistency_level,
1047 table_id = self.table_id.table_id,
1048 "switch to new op consistency level"
1049 );
1050 self.commit_inner(new_epoch, Some(op_consistency_level))
1051 .await
1052 } else {
1053 self.commit_inner(new_epoch, None).await
1054 }
1055 }
1056
1057 async fn commit_inner(
1058 &mut self,
1059 new_epoch: EpochPair,
1060 switch_consistent_op: Option<StateTableOpConsistencyLevel>,
1061 ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1062 {
1063 assert!(!self.on_post_commit);
1064 assert_eq!(
1065 self.epoch.expect("should only be called after init").curr,
1066 new_epoch.prev
1067 );
1068 let switch_op_consistency_level = switch_consistent_op.map(|new_consistency_level| {
1069 assert_ne!(self.op_consistency_level, new_consistency_level);
1070 self.op_consistency_level = new_consistency_level;
1071 match new_consistency_level {
1072 StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
1073 StateTableOpConsistencyLevel::ConsistentOldValue => {
1074 consistent_old_value_op(self.row_serde.clone(), false)
1075 }
1076 StateTableOpConsistencyLevel::LogStoreEnabled => {
1077 consistent_old_value_op(self.row_serde.clone(), true)
1078 }
1079 }
1080 });
1081 trace!(
1082 table_id = %self.table_id,
1083 epoch = ?self.epoch,
1084 "commit state table"
1085 );
1086
1087 self.local_store
1088 .flush()
1089 .instrument(tracing::info_span!("state_table_flush"))
1090 .await?;
1091 let table_watermarks = self.commit_pending_watermark();
1092 self.local_store.seal_current_epoch(
1093 new_epoch.curr,
1094 SealCurrentEpochOptions {
1095 table_watermarks,
1096 switch_op_consistency_level,
1097 },
1098 );
1099 self.epoch = Some(new_epoch);
1100
1101 if USE_WATERMARK_CACHE
1103 && !self.watermark_cache.is_synced()
1104 && let Some(ref watermark) = self.committed_watermark
1105 {
1106 let range: (Bound<Once<Datum>>, Bound<Once<Datum>>) =
1107 (Included(once(Some(watermark.clone()))), Unbounded);
1108 let mut pks = Vec::with_capacity(self.watermark_cache.capacity());
1117 {
1118 let mut streams = vec![];
1119 for vnode in self.vnodes().iter_vnodes() {
1120 let stream = self
1121 .iter_keyed_row_with_vnode(vnode, &range, PrefetchOptions::default())
1122 .await?;
1123 streams.push(Box::pin(stream));
1124 }
1125 let merged_stream = merge_sort(streams);
1126 pin_mut!(merged_stream);
1127
1128 #[for_await]
1129 for entry in merged_stream.take(self.watermark_cache.capacity()) {
1130 let keyed_row = entry?;
1131 let pk = self.pk_serde.deserialize(keyed_row.key())?;
1132 if !pk.is_null_at(self.clean_watermark_index_in_pk.unwrap_or(0) as usize) {
1134 pks.push(pk);
1135 }
1136 }
1137 }
1138
1139 let mut filler = self.watermark_cache.begin_syncing();
1140 for pk in pks {
1141 filler.insert_unchecked(DefaultOrdered(pk), ());
1142 }
1143 filler.finish();
1144
1145 let n_cache_entries = self.watermark_cache.len();
1146 if n_cache_entries < self.watermark_cache.capacity() {
1147 self.watermark_cache.set_table_row_count(n_cache_entries);
1148 }
1149 }
1150
1151 self.on_post_commit = true;
1152 Ok(StateTablePostCommit { inner: self })
1153 }
1154
1155 fn commit_pending_watermark(
1157 &mut self,
1158 ) -> Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)> {
1159 let watermark = self.pending_watermark.take()?;
1160 trace!(table_id = %self.table_id, watermark = ?watermark, "state cleaning");
1161
1162 assert!(
1163 !self.pk_indices().is_empty(),
1164 "see pending watermark on empty pk"
1165 );
1166 let watermark_serializer = {
1167 match self.clean_watermark_index_in_pk {
1168 None => self.pk_serde.index(0),
1169 Some(clean_watermark_index_in_pk) => {
1170 self.pk_serde.index(clean_watermark_index_in_pk as usize)
1171 }
1172 }
1173 };
1174
1175 let watermark_type = match self.clean_watermark_index_in_pk {
1176 None => WatermarkSerdeType::PkPrefix,
1177 Some(clean_watermark_index_in_pk) => match clean_watermark_index_in_pk {
1178 0 => WatermarkSerdeType::PkPrefix,
1179 _ => WatermarkSerdeType::NonPkPrefix,
1180 },
1181 };
1182
1183 let should_clean_watermark = {
1184 {
1185 if USE_WATERMARK_CACHE && self.watermark_cache.is_synced() {
1186 if let Some(key) = self.watermark_cache.lowest_key() {
1187 watermark.as_scalar_ref_impl().default_cmp(&key).is_ge()
1188 } else {
1189 false
1194 }
1195 } else {
1196 true
1200 }
1201 }
1202 };
1203
1204 let watermark_suffix =
1205 serialize_pk(row::once(Some(watermark.clone())), &watermark_serializer);
1206
1207 let seal_watermark = if should_clean_watermark {
1209 trace!(table_id = %self.table_id, watermark = ?watermark_suffix, vnodes = ?{
1210 self.vnodes().iter_vnodes().collect_vec()
1211 }, "delete range");
1212
1213 let order_type = watermark_serializer.get_order_types().get(0).unwrap();
1214
1215 if order_type.is_ascending() {
1216 Some((
1217 WatermarkDirection::Ascending,
1218 VnodeWatermark::new(
1219 self.vnodes().clone(),
1220 Bytes::copy_from_slice(watermark_suffix.as_ref()),
1221 ),
1222 watermark_type,
1223 ))
1224 } else {
1225 Some((
1226 WatermarkDirection::Descending,
1227 VnodeWatermark::new(
1228 self.vnodes().clone(),
1229 Bytes::copy_from_slice(watermark_suffix.as_ref()),
1230 ),
1231 watermark_type,
1232 ))
1233 }
1234 } else {
1235 None
1236 };
1237 self.committed_watermark = Some(watermark);
1238
1239 if USE_WATERMARK_CACHE && seal_watermark.is_some() {
1247 self.watermark_cache.clear();
1248 }
1249
1250 seal_watermark.map(|(direction, watermark, is_non_pk_prefix)| {
1251 (direction, vec![watermark], is_non_pk_prefix)
1252 })
1253 }
1254
1255 pub async fn try_flush(&mut self) -> StreamExecutorResult<()> {
1256 self.local_store.try_flush().await?;
1257 Ok(())
1258 }
1259}
1260
1261pub trait RowStream<'a> = Stream<Item = StreamExecutorResult<OwnedRow>> + 'a;
1262pub trait KeyedRowStream<'a> = Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a;
1263pub trait PkRowStream<'a, K> = Stream<Item = StreamExecutorResult<(K, OwnedRow)>> + 'a;
1264
1265impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1267 StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1268where
1269 S: StateStore,
1270 SD: ValueRowSerde,
1271{
1272 pub async fn iter_with_vnode(
1275 &self,
1276
1277 vnode: VirtualNode,
1281 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1282 prefetch_options: PrefetchOptions,
1283 ) -> StreamExecutorResult<impl RowStream<'_>> {
1284 Ok(deserialize_keyed_row_stream::<'_, ()>(
1285 self.iter_kv_with_pk_range(pk_range, vnode, prefetch_options)
1286 .await?,
1287 &*self.row_serde,
1288 )
1289 .map_ok(|(_, row)| row))
1290 }
1291
1292 pub async fn iter_keyed_row_with_vnode(
1293 &self,
1294 vnode: VirtualNode,
1295 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1296 prefetch_options: PrefetchOptions,
1297 ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1298 Ok(deserialize_keyed_row_stream(
1299 self.iter_kv_with_pk_range(pk_range, vnode, prefetch_options)
1300 .await?,
1301 &*self.row_serde,
1302 )
1303 .map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)))
1304 }
1305
1306 pub async fn iter_with_vnode_and_output_indices(
1307 &self,
1308 vnode: VirtualNode,
1309 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1310 prefetch_options: PrefetchOptions,
1311 ) -> StreamExecutorResult<impl RowStream<'_>> {
1312 assert!(IS_REPLICATED);
1313 let stream = self
1314 .iter_with_vnode(vnode, pk_range, prefetch_options)
1315 .await?;
1316 Ok(stream.map(|row| row.map(|row| row.project(&self.output_indices).into_owned_row())))
1317 }
1318
1319 async fn iter_kv(
1320 &self,
1321 table_key_range: TableKeyRange,
1322 prefix_hint: Option<Bytes>,
1323 prefetch_options: PrefetchOptions,
1324 ) -> StreamExecutorResult<<S::Local as LocalStateStore>::Iter<'_>> {
1325 let read_options = ReadOptions {
1326 prefix_hint,
1327 retention_seconds: self.table_option.retention_seconds,
1328 prefetch_options,
1329 cache_policy: CachePolicy::Fill(Hint::Normal),
1330 };
1331
1332 Ok(self.local_store.iter(table_key_range, read_options).await?)
1333 }
1334
1335 async fn rev_iter_kv(
1336 &self,
1337 table_key_range: TableKeyRange,
1338 prefix_hint: Option<Bytes>,
1339 prefetch_options: PrefetchOptions,
1340 ) -> StreamExecutorResult<<S::Local as LocalStateStore>::RevIter<'_>> {
1341 let read_options = ReadOptions {
1342 prefix_hint,
1343 retention_seconds: self.table_option.retention_seconds,
1344 prefetch_options,
1345 cache_policy: CachePolicy::Fill(Hint::Normal),
1346 };
1347
1348 Ok(self
1349 .local_store
1350 .rev_iter(table_key_range, read_options)
1351 .await?)
1352 }
1353
1354 pub async fn iter_with_prefix(
1358 &self,
1359 pk_prefix: impl Row,
1360 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1361 prefetch_options: PrefetchOptions,
1362 ) -> StreamExecutorResult<impl RowStream<'_>> {
1363 let stream = self.iter_with_prefix_inner::<false, ()>(pk_prefix, sub_range, prefetch_options)
1364 .await?;
1365 Ok(stream.map_ok(|(_, row)| row))
1366 }
1367
1368 pub async fn get_from_one_row_table(&self) -> StreamExecutorResult<Option<OwnedRow>> {
1370 let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
1371 let stream = self
1372 .iter_with_prefix(row::empty(), sub_range, Default::default())
1373 .await?;
1374 pin_mut!(stream);
1375
1376 if let Some(res) = stream.next().await {
1377 let value = res?.into_owned_row();
1378 assert!(stream.next().await.is_none());
1379 Ok(Some(value))
1380 } else {
1381 Ok(None)
1382 }
1383 }
1384
1385 pub async fn get_from_one_value_table(&self) -> StreamExecutorResult<Option<ScalarImpl>> {
1390 Ok(self
1391 .get_from_one_row_table()
1392 .await?
1393 .and_then(|row| row[0].clone()))
1394 }
1395
1396 pub async fn iter_keyed_row_with_prefix(
1397 &self,
1398 pk_prefix: impl Row,
1399 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1400 prefetch_options: PrefetchOptions,
1401 ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1402 Ok(
1403 self.iter_with_prefix_inner::<false, Bytes>(pk_prefix, sub_range, prefetch_options)
1404 .await?.map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)),
1405 )
1406 }
1407
1408 pub async fn rev_iter_with_prefix(
1410 &self,
1411 pk_prefix: impl Row,
1412 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1413 prefetch_options: PrefetchOptions,
1414 ) -> StreamExecutorResult<impl RowStream<'_>> {
1415 Ok(
1416 self.iter_with_prefix_inner::<true, ()>(pk_prefix, sub_range, prefetch_options)
1417 .await?.map_ok(|(_, row)| row),
1418 )
1419 }
1420
1421 async fn iter_with_prefix_inner<const REVERSE: bool, K: CopyFromSlice>(
1422 &self,
1423 pk_prefix: impl Row,
1424 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1425 prefetch_options: PrefetchOptions,
1426 ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1427 let prefix_serializer = self.pk_serde.prefix(pk_prefix.len());
1428 let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
1429
1430 let vnode = self.compute_prefix_vnode(&pk_prefix);
1434
1435 let pk_prefix_indices = &self.pk_indices[..pk_prefix.len()];
1437 if self.prefix_hint_len != 0 {
1438 debug_assert_eq!(self.prefix_hint_len, pk_prefix.len());
1439 }
1440 let prefix_hint = {
1441 if self.prefix_hint_len == 0 || self.prefix_hint_len > pk_prefix.len() {
1442 None
1443 } else {
1444 let encoded_prefix_len = self
1445 .pk_serde
1446 .deserialize_prefix_len(&encoded_prefix, self.prefix_hint_len)?;
1447
1448 Some(Bytes::copy_from_slice(
1449 &encoded_prefix[..encoded_prefix_len],
1450 ))
1451 }
1452 };
1453
1454 trace!(
1455 table_id = %self.table_id(),
1456 ?prefix_hint, ?pk_prefix,
1457 ?pk_prefix_indices,
1458 iter_direction = if REVERSE { "reverse" } else { "forward" },
1459 "storage_iter_with_prefix"
1460 );
1461
1462 let memcomparable_range =
1463 prefix_and_sub_range_to_memcomparable(&self.pk_serde, sub_range, pk_prefix);
1464
1465 let memcomparable_range_with_vnode = prefixed_range_with_vnode(memcomparable_range, vnode);
1466
1467 Ok(if REVERSE {
1468 futures::future::Either::Left(deserialize_keyed_row_stream(
1469 self.rev_iter_kv(
1470 memcomparable_range_with_vnode,
1471 prefix_hint,
1472 prefetch_options,
1473 )
1474 .await?,
1475 &*self.row_serde,
1476 ))
1477 } else {
1478 futures::future::Either::Right(deserialize_keyed_row_stream(
1479 self.iter_kv(
1480 memcomparable_range_with_vnode,
1481 prefix_hint,
1482 prefetch_options,
1483 )
1484 .await?,
1485 &*self.row_serde,
1486 ))
1487 })
1488 }
1489
1490 async fn iter_kv_with_pk_range(
1493 &self,
1494 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1495 vnode: VirtualNode,
1499 prefetch_options: PrefetchOptions,
1500 ) -> StreamExecutorResult<<S::Local as LocalStateStore>::Iter<'_>> {
1501 let memcomparable_range = prefix_range_to_memcomparable(&self.pk_serde, pk_range);
1502 let memcomparable_range_with_vnode = prefixed_range_with_vnode(memcomparable_range, vnode);
1503
1504 self.iter_kv(memcomparable_range_with_vnode, None, prefetch_options)
1506 .await
1507 }
1508
1509 #[cfg(test)]
1510 pub fn get_watermark_cache(&self) -> &StateTableWatermarkCache {
1511 &self.watermark_cache
1512 }
1513}
1514
1515impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1516 StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1517where
1518 S: StateStore,
1519 SD: ValueRowSerde,
1520{
1521 pub async fn iter_log_with_vnode(
1522 &self,
1523 vnode: VirtualNode,
1524 epoch_range: (u64, u64),
1525 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1526 ) -> StreamExecutorResult<impl Stream<Item = StreamExecutorResult<ChangeLogRow>> + '_> {
1527 let memcomparable_range = prefix_range_to_memcomparable(&self.pk_serde, pk_range);
1528 let memcomparable_range_with_vnode = prefixed_range_with_vnode(memcomparable_range, vnode);
1529 Ok(deserialize_log_stream(
1530 self.store
1531 .iter_log(
1532 epoch_range,
1533 memcomparable_range_with_vnode,
1534 ReadLogOptions {
1535 table_id: self.table_id,
1536 },
1537 )
1538 .await?,
1539 &*self.row_serde,
1540 )
1541 .map_err(Into::into))
1542 }
1543}
1544
1545fn deserialize_keyed_row_stream<'a, K: CopyFromSlice>(
1546 iter: impl StateStoreIter + 'a,
1547 deserializer: &'a impl ValueRowSerde,
1548) -> impl PkRowStream<'a, K> {
1549 iter.into_stream(move |(key, value)| {
1550 Ok((
1551 K::copy_from_slice(key.user_key.table_key.as_ref()),
1552 deserializer.deserialize(value).map(OwnedRow::new)?,
1553 ))
1554 })
1555 .map_err(Into::into)
1556}
1557
1558pub fn prefix_range_to_memcomparable(
1559 pk_serde: &OrderedRowSerde,
1560 range: &(Bound<impl Row>, Bound<impl Row>),
1561) -> (Bound<Bytes>, Bound<Bytes>) {
1562 (
1563 start_range_to_memcomparable(pk_serde, &range.0),
1564 end_range_to_memcomparable(pk_serde, &range.1, None),
1565 )
1566}
1567
1568fn prefix_and_sub_range_to_memcomparable(
1569 pk_serde: &OrderedRowSerde,
1570 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1571 pk_prefix: impl Row,
1572) -> (Bound<Bytes>, Bound<Bytes>) {
1573 let (range_start, range_end) = sub_range;
1574 let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1575 let serialized_pk_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
1576 let start_range = match range_start {
1577 Included(start_range) => Bound::Included(Either::Left((&pk_prefix).chain(start_range))),
1578 Excluded(start_range) => Bound::Excluded(Either::Left((&pk_prefix).chain(start_range))),
1579 Unbounded => Bound::Included(Either::Right(&pk_prefix)),
1580 };
1581 let end_range = match range_end {
1582 Included(end_range) => Bound::Included((&pk_prefix).chain(end_range)),
1583 Excluded(end_range) => Bound::Excluded((&pk_prefix).chain(end_range)),
1584 Unbounded => Unbounded,
1585 };
1586 (
1587 start_range_to_memcomparable(pk_serde, &start_range),
1588 end_range_to_memcomparable(pk_serde, &end_range, Some(serialized_pk_prefix)),
1589 )
1590}
1591
1592fn start_range_to_memcomparable<R: Row>(
1593 pk_serde: &OrderedRowSerde,
1594 bound: &Bound<R>,
1595) -> Bound<Bytes> {
1596 let serialize_pk_prefix = |pk_prefix: &R| {
1597 let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1598 serialize_pk(pk_prefix, &prefix_serializer)
1599 };
1600 match bound {
1601 Unbounded => Unbounded,
1602 Included(r) => {
1603 let serialized = serialize_pk_prefix(r);
1604
1605 Included(serialized)
1606 }
1607 Excluded(r) => {
1608 let serialized = serialize_pk_prefix(r);
1609
1610 start_bound_of_excluded_prefix(&serialized)
1611 }
1612 }
1613}
1614
1615fn end_range_to_memcomparable<R: Row>(
1616 pk_serde: &OrderedRowSerde,
1617 bound: &Bound<R>,
1618 serialized_pk_prefix: Option<Bytes>,
1619) -> Bound<Bytes> {
1620 let serialize_pk_prefix = |pk_prefix: &R| {
1621 let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1622 serialize_pk(pk_prefix, &prefix_serializer)
1623 };
1624 match bound {
1625 Unbounded => match serialized_pk_prefix {
1626 Some(serialized_pk_prefix) => end_bound_of_prefix(&serialized_pk_prefix),
1627 None => Unbounded,
1628 },
1629 Included(r) => {
1630 let serialized = serialize_pk_prefix(r);
1631
1632 end_bound_of_prefix(&serialized)
1633 }
1634 Excluded(r) => {
1635 let serialized = serialize_pk_prefix(r);
1636 Excluded(serialized)
1637 }
1638 }
1639}
1640
1641fn fill_non_output_indices(
1642 i2o_mapping: &ColIndexMapping,
1643 data_types: &[DataType],
1644 chunk: StreamChunk,
1645) -> StreamChunk {
1646 let cardinality = chunk.cardinality();
1647 let (ops, columns, vis) = chunk.into_inner();
1648 let mut full_columns = Vec::with_capacity(data_types.len());
1649 for (i, data_type) in data_types.iter().enumerate() {
1650 if let Some(j) = i2o_mapping.try_map(i) {
1651 full_columns.push(columns[j].clone());
1652 } else {
1653 let mut column_builder = ArrayImplBuilder::with_type(cardinality, data_type.clone());
1654 column_builder.append_n_null(cardinality);
1655 let column: ArrayRef = column_builder.finish().into();
1656 full_columns.push(column)
1657 }
1658 }
1659 let data_chunk = DataChunk::new(full_columns, vis);
1660 StreamChunk::from_parts(ops, data_chunk)
1661}
1662
1663#[cfg(test)]
1664mod tests {
1665 use std::fmt::Debug;
1666
1667 use expect_test::{Expect, expect};
1668
1669 use super::*;
1670
1671 fn check(actual: impl Debug, expect: Expect) {
1672 let actual = format!("{:#?}", actual);
1673 expect.assert_eq(&actual);
1674 }
1675
1676 #[test]
1677 fn test_fill_non_output_indices() {
1678 let data_types = vec![DataType::Int32, DataType::Int32, DataType::Int32];
1679 let replicated_chunk = [OwnedRow::new(vec![
1680 Some(222_i32.into()),
1681 Some(2_i32.into()),
1682 ])];
1683 let replicated_chunk = StreamChunk::from_parts(
1684 vec![Op::Insert],
1685 DataChunk::from_rows(&replicated_chunk, &[DataType::Int32, DataType::Int32]),
1686 );
1687 let i2o_mapping = ColIndexMapping::new(vec![Some(1), None, Some(0)], 2);
1688 let filled_chunk = fill_non_output_indices(&i2o_mapping, &data_types, replicated_chunk);
1689 check(
1690 filled_chunk,
1691 expect![[r#"
1692 StreamChunk { cardinality: 1, capacity: 1, data:
1693 +---+---+---+-----+
1694 | + | 2 | | 222 |
1695 +---+---+---+-----+
1696 }"#]],
1697 );
1698 }
1699}