1use std::collections::HashMap;
16use std::ops::Bound;
17use std::ops::Bound::*;
18use std::sync::Arc;
19
20use bytes::{BufMut, Bytes, BytesMut};
21use either::Either;
22use foyer::Hint;
23use futures::{Stream, StreamExt, TryStreamExt, pin_mut};
24use futures_async_stream::for_await;
25use itertools::{Itertools, izip};
26use risingwave_common::array::stream_record::Record;
27use risingwave_common::array::{ArrayImplBuilder, ArrayRef, DataChunk, Op, StreamChunk};
28use risingwave_common::bitmap::Bitmap;
29use risingwave_common::catalog::{
30 ColumnDesc, ColumnId, TableId, TableOption, get_dist_key_in_pk_indices,
31};
32use risingwave_common::hash::{VirtualNode, VnodeBitmapExt, VnodeCountCompat};
33use risingwave_common::row::{self, Once, OwnedRow, Row, RowExt, once};
34use risingwave_common::types::{DataType, Datum, DefaultOrd, DefaultOrdered, ScalarImpl};
35use risingwave_common::util::column_index_mapping::ColIndexMapping;
36use risingwave_common::util::epoch::EpochPair;
37use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
38use risingwave_common::util::row_serde::OrderedRowSerde;
39use risingwave_common::util::sort_util::OrderType;
40use risingwave_common::util::value_encoding::BasicSerde;
41use risingwave_hummock_sdk::HummockReadEpoch;
42use risingwave_hummock_sdk::key::{
43 CopyFromSlice, TableKey, TableKeyRange, end_bound_of_prefix, prefixed_range_with_vnode,
44 start_bound_of_excluded_prefix,
45};
46use risingwave_hummock_sdk::table_watermark::{
47 VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
48};
49use risingwave_pb::catalog::Table;
50use risingwave_storage::StateStore;
51use risingwave_storage::error::{ErrorKind, StorageError, StorageResult};
52use risingwave_storage::hummock::CachePolicy;
53use risingwave_storage::mem_table::MemTableError;
54use risingwave_storage::row_serde::find_columns_by_ids;
55use risingwave_storage::row_serde::row_serde_util::{
56 deserialize_pk_with_vnode, serialize_pk, serialize_pk_with_vnode,
57};
58use risingwave_storage::row_serde::value_serde::ValueRowSerde;
59use risingwave_storage::store::*;
60use risingwave_storage::table::merge_sort::merge_sort;
61use risingwave_storage::table::{
62 ChangeLogRow, KeyedRow, TableDistribution, deserialize_log_stream,
63};
64use thiserror_ext::AsReport;
65use tracing::{Instrument, trace};
66
67use crate::cache::cache_may_stale;
68use crate::common::state_cache::{StateCache, StateCacheFiller};
69use crate::common::table::state_table_cache::StateTableWatermarkCache;
70use crate::executor::StreamExecutorResult;
71
72const WATERMARK_CACHE_ENTRIES: usize = 16;
76
77macro_rules! insane_mode_discard_point {
80 () => {{
81 use rand::Rng;
82 if crate::consistency::insane() && rand::rng().random_bool(0.3) {
83 return;
84 }
85 }};
86}
87
88#[derive(Clone)]
91pub struct StateTableInner<
92 S,
93 SD = BasicSerde,
94 const IS_REPLICATED: bool = false,
95 const USE_WATERMARK_CACHE: bool = false,
96> where
97 S: StateStore,
98 SD: ValueRowSerde,
99{
100 table_id: TableId,
102
103 local_store: S::Local,
105
106 store: S,
108
109 epoch: Option<EpochPair>,
111
112 pk_serde: OrderedRowSerde,
114
115 row_serde: Arc<SD>,
117
118 pk_indices: Vec<usize>,
122
123 distribution: TableDistribution,
129
130 prefix_hint_len: usize,
131
132 table_option: TableOption,
134
135 value_indices: Option<Vec<usize>>,
136
137 pending_watermark: Option<ScalarImpl>,
139 committed_watermark: Option<ScalarImpl>,
141 watermark_cache: StateTableWatermarkCache,
143
144 data_types: Vec<DataType>,
147
148 i2o_mapping: ColIndexMapping,
154
155 output_indices: Vec<usize>,
160
161 op_consistency_level: StateTableOpConsistencyLevel,
162
163 clean_watermark_index_in_pk: Option<i32>,
164
165 on_post_commit: bool,
168}
169
170pub type StateTable<S> = StateTableInner<S, BasicSerde>;
172pub type ReplicatedStateTable<S, SD> = StateTableInner<S, SD, true>;
175pub type WatermarkCacheStateTable<S> = StateTableInner<S, BasicSerde, false, true>;
178pub type WatermarkCacheParameterizedStateTable<S, const USE_WATERMARK_CACHE: bool> =
179 StateTableInner<S, BasicSerde, false, USE_WATERMARK_CACHE>;
180
181impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
183 StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
184where
185 S: StateStore,
186 SD: ValueRowSerde,
187{
188 pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
191 self.local_store.init(InitOptions::new(epoch)).await?;
192 assert_eq!(None, self.epoch.replace(epoch), "should not init for twice");
193 Ok(())
194 }
195
196 pub async fn try_wait_committed_epoch(&self, prev_epoch: u64) -> StorageResult<()> {
197 self.store
198 .try_wait_epoch(
199 HummockReadEpoch::Committed(prev_epoch),
200 TryWaitEpochOptions {
201 table_id: self.table_id,
202 },
203 )
204 .await
205 }
206
207 pub fn state_store(&self) -> &S {
208 &self.store
209 }
210}
211
212fn consistent_old_value_op(
213 row_serde: Arc<impl ValueRowSerde>,
214 is_log_store: bool,
215) -> OpConsistencyLevel {
216 OpConsistencyLevel::ConsistentOldValue {
217 check_old_value: Arc::new(move |first: &Bytes, second: &Bytes| {
218 if first == second {
219 return true;
220 }
221 let first = match row_serde.deserialize(first) {
222 Ok(rows) => rows,
223 Err(e) => {
224 error!(error = %e.as_report(), value = ?first, "fail to deserialize serialized value");
225 return false;
226 }
227 };
228 let second = match row_serde.deserialize(second) {
229 Ok(rows) => rows,
230 Err(e) => {
231 error!(error = %e.as_report(), value = ?second, "fail to deserialize serialized value");
232 return false;
233 }
234 };
235 if first != second {
236 error!(first = ?first, second = ?second, "sanity check fail");
237 false
238 } else {
239 true
240 }
241 }),
242 is_log_store,
243 }
244}
245
246#[derive(Eq, PartialEq, Copy, Clone, Debug)]
247pub enum StateTableOpConsistencyLevel {
248 Inconsistent,
250 ConsistentOldValue,
254 LogStoreEnabled,
257}
258
259impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
264 StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
265where
266 S: StateStore,
267 SD: ValueRowSerde,
268{
269 pub async fn from_table_catalog(
273 table_catalog: &Table,
274 store: S,
275 vnodes: Option<Arc<Bitmap>>,
276 ) -> Self {
277 Self::from_table_catalog_with_consistency_level(
278 table_catalog,
279 store,
280 vnodes,
281 StateTableOpConsistencyLevel::ConsistentOldValue,
282 )
283 .await
284 }
285
286 pub async fn from_table_catalog_inconsistent_op(
288 table_catalog: &Table,
289 store: S,
290 vnodes: Option<Arc<Bitmap>>,
291 ) -> Self {
292 Self::from_table_catalog_with_consistency_level(
293 table_catalog,
294 store,
295 vnodes,
296 StateTableOpConsistencyLevel::Inconsistent,
297 )
298 .await
299 }
300
301 pub async fn from_table_catalog_with_consistency_level(
302 table_catalog: &Table,
303 store: S,
304 vnodes: Option<Arc<Bitmap>>,
305 consistency_level: StateTableOpConsistencyLevel,
306 ) -> Self {
307 Self::from_table_catalog_inner(table_catalog, store, vnodes, consistency_level, vec![])
308 .await
309 }
310
311 async fn from_table_catalog_inner(
313 table_catalog: &Table,
314 store: S,
315 vnodes: Option<Arc<Bitmap>>,
316 op_consistency_level: StateTableOpConsistencyLevel,
317 output_column_ids: Vec<ColumnId>,
318 ) -> Self {
319 let table_id = TableId::new(table_catalog.id);
320 let table_columns: Vec<ColumnDesc> = table_catalog
321 .columns
322 .iter()
323 .map(|col| col.column_desc.as_ref().unwrap().into())
324 .collect();
325 let data_types: Vec<DataType> = table_catalog
326 .columns
327 .iter()
328 .map(|col| {
329 col.get_column_desc()
330 .unwrap()
331 .get_column_type()
332 .unwrap()
333 .into()
334 })
335 .collect();
336 let order_types: Vec<OrderType> = table_catalog
337 .pk
338 .iter()
339 .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
340 .collect();
341 let dist_key_indices: Vec<usize> = table_catalog
342 .distribution_key
343 .iter()
344 .map(|dist_index| *dist_index as usize)
345 .collect();
346
347 let pk_indices = table_catalog
348 .pk
349 .iter()
350 .map(|col_order| col_order.column_index as usize)
351 .collect_vec();
352
353 let dist_key_in_pk_indices = if table_catalog.get_dist_key_in_pk().is_empty() {
355 get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices).unwrap()
356 } else {
357 table_catalog
358 .get_dist_key_in_pk()
359 .iter()
360 .map(|idx| *idx as usize)
361 .collect()
362 };
363
364 let vnode_col_idx_in_pk = table_catalog.vnode_col_index.as_ref().and_then(|idx| {
365 let vnode_col_idx = *idx as usize;
366 pk_indices.iter().position(|&i| vnode_col_idx == i)
367 });
368
369 let distribution =
370 TableDistribution::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk);
371 assert_eq!(
372 distribution.vnode_count(),
373 table_catalog.vnode_count(),
374 "vnode count mismatch, scanning table {} under wrong distribution?",
375 table_catalog.name,
376 );
377
378 let pk_data_types = pk_indices
379 .iter()
380 .map(|i| table_columns[*i].data_type.clone())
381 .collect();
382 let pk_serde = OrderedRowSerde::new(pk_data_types, order_types);
383
384 let input_value_indices = table_catalog
385 .value_indices
386 .iter()
387 .map(|val| *val as usize)
388 .collect_vec();
389
390 let no_shuffle_value_indices = (0..table_columns.len()).collect_vec();
391
392 let value_indices = match input_value_indices.len() == table_columns.len()
394 && input_value_indices == no_shuffle_value_indices
395 {
396 true => None,
397 false => Some(input_value_indices),
398 };
399 let prefix_hint_len = table_catalog.read_prefix_len_hint as usize;
400
401 let row_serde = Arc::new(SD::new(
402 Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
403 Arc::from(table_columns.clone().into_boxed_slice()),
404 ));
405
406 let state_table_op_consistency_level = op_consistency_level;
407 let op_consistency_level = match op_consistency_level {
408 StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
409 StateTableOpConsistencyLevel::ConsistentOldValue => {
410 consistent_old_value_op(row_serde.clone(), false)
411 }
412 StateTableOpConsistencyLevel::LogStoreEnabled => {
413 consistent_old_value_op(row_serde.clone(), true)
414 }
415 };
416
417 let table_option = TableOption::new(table_catalog.retention_seconds);
418 let new_local_options = if IS_REPLICATED {
419 NewLocalOptions::new_replicated(
420 table_id,
421 op_consistency_level,
422 table_option,
423 distribution.vnodes().clone(),
424 )
425 } else {
426 NewLocalOptions::new(
427 table_id,
428 op_consistency_level,
429 table_option,
430 distribution.vnodes().clone(),
431 true,
432 )
433 };
434 let local_state_store = store.new_local(new_local_options).await;
435
436 assert_eq!(
442 table_catalog.version.is_some(),
443 row_serde.kind().is_column_aware()
444 );
445
446 let watermark_serde = if pk_indices.is_empty() {
448 None
449 } else {
450 match table_catalog.clean_watermark_index_in_pk {
451 None => Some(pk_serde.index(0)),
452 Some(clean_watermark_index_in_pk) => {
453 Some(pk_serde.index(clean_watermark_index_in_pk as usize))
454 }
455 }
456 };
457 let max_watermark_of_vnodes = distribution
458 .vnodes()
459 .iter_vnodes()
460 .filter_map(|vnode| local_state_store.get_table_watermark(vnode))
461 .max();
462 let committed_watermark = if let Some(deser) = watermark_serde
463 && let Some(max_watermark) = max_watermark_of_vnodes
464 {
465 let deserialized = deser.deserialize(&max_watermark).ok().and_then(|row| {
466 assert!(row.len() == 1);
467 row[0].clone()
468 });
469 if deserialized.is_none() {
470 tracing::error!(
471 vnodes = ?distribution.vnodes(),
472 watermark = ?max_watermark,
473 "Failed to deserialize persisted watermark from state store.",
474 );
475 }
476 deserialized
477 } else {
478 None
479 };
480
481 let watermark_cache = if USE_WATERMARK_CACHE {
482 StateTableWatermarkCache::new(WATERMARK_CACHE_ENTRIES)
483 } else {
484 StateTableWatermarkCache::new(0)
485 };
486
487 let output_column_ids_to_input_idx = output_column_ids
489 .iter()
490 .enumerate()
491 .map(|(pos, id)| (*id, pos))
492 .collect::<HashMap<_, _>>();
493
494 let columns: Vec<ColumnDesc> = table_catalog
496 .columns
497 .iter()
498 .map(|c| c.column_desc.as_ref().unwrap().into())
499 .collect_vec();
500
501 let mut i2o_mapping = vec![None; columns.len()];
505 for (i, column) in columns.iter().enumerate() {
506 if let Some(pos) = output_column_ids_to_input_idx.get(&column.column_id) {
507 i2o_mapping[i] = Some(*pos);
508 }
509 }
510 let i2o_mapping = ColIndexMapping::new(i2o_mapping, output_column_ids.len());
512
513 let (_, output_indices) = find_columns_by_ids(&columns[..], &output_column_ids);
515
516 Self {
517 table_id,
518 local_store: local_state_store,
519 store,
520 epoch: None,
521 pk_serde,
522 row_serde,
523 pk_indices,
524 distribution,
525 prefix_hint_len,
526 table_option,
527 value_indices,
528 pending_watermark: None,
529 committed_watermark,
530 watermark_cache,
531 data_types,
532 output_indices,
533 i2o_mapping,
534 op_consistency_level: state_table_op_consistency_level,
535 clean_watermark_index_in_pk: table_catalog.clean_watermark_index_in_pk,
536 on_post_commit: false,
537 }
538 }
539
540 pub fn get_data_types(&self) -> &[DataType] {
541 &self.data_types
542 }
543
544 pub fn table_id(&self) -> u32 {
545 self.table_id.table_id
546 }
547
548 fn compute_prefix_vnode(&self, pk_prefix: impl Row) -> VirtualNode {
550 self.distribution
551 .try_compute_vnode_by_pk_prefix(pk_prefix)
552 .expect("For streaming, the given prefix must be enough to calculate the vnode")
553 }
554
555 pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode {
557 self.distribution.compute_vnode_by_pk(pk)
558 }
559
560 pub fn pk_indices(&self) -> &[usize] {
563 &self.pk_indices
564 }
565
566 pub fn pk_in_output_indices(&self) -> Option<Vec<usize>> {
570 assert!(IS_REPLICATED);
571 self.pk_indices
572 .iter()
573 .map(|&i| self.output_indices.iter().position(|&j| i == j))
574 .collect()
575 }
576
577 pub fn pk_serde(&self) -> &OrderedRowSerde {
578 &self.pk_serde
579 }
580
581 pub fn vnodes(&self) -> &Arc<Bitmap> {
582 self.distribution.vnodes()
583 }
584
585 pub fn value_indices(&self) -> &Option<Vec<usize>> {
586 &self.value_indices
587 }
588
589 pub fn is_consistent_op(&self) -> bool {
590 matches!(
591 self.op_consistency_level,
592 StateTableOpConsistencyLevel::ConsistentOldValue
593 | StateTableOpConsistencyLevel::LogStoreEnabled
594 )
595 }
596}
597
598impl<S, SD, const USE_WATERMARK_CACHE: bool> StateTableInner<S, SD, true, USE_WATERMARK_CACHE>
599where
600 S: StateStore,
601 SD: ValueRowSerde,
602{
603 pub async fn from_table_catalog_with_output_column_ids(
605 table_catalog: &Table,
606 store: S,
607 vnodes: Option<Arc<Bitmap>>,
608 output_column_ids: Vec<ColumnId>,
609 ) -> Self {
610 Self::from_table_catalog_inner(
611 table_catalog,
612 store,
613 vnodes,
614 StateTableOpConsistencyLevel::Inconsistent,
615 output_column_ids,
616 )
617 .await
618 }
619}
620
621impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
623 StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
624where
625 S: StateStore,
626 SD: ValueRowSerde,
627{
628 pub async fn get_row(&self, pk: impl Row) -> StreamExecutorResult<Option<OwnedRow>> {
630 let row_serde = self.row_serde.clone();
632 let row = self
633 .get_inner(pk, move |_, value| Ok(row_serde.deserialize(value)?))
634 .await?;
635 match row {
636 Some(row) => {
637 if IS_REPLICATED {
638 let row = row.project(&self.output_indices);
641 Ok(Some(row.into_owned_row()))
642 } else {
643 Ok(Some(OwnedRow::new(row)))
644 }
645 }
646 None => Ok(None),
647 }
648 }
649
650 pub async fn get_encoded_row(&self, pk: impl Row) -> StreamExecutorResult<Option<Bytes>> {
652 self.get_inner(pk, |_, value| Ok(Bytes::copy_from_slice(value)))
653 .await
654 }
655
656 async fn get_inner<O: Send + 'static>(
657 &self,
658 pk: impl Row,
659 on_key_value_fn: impl risingwave_storage::store::KeyValueFn<O>,
660 ) -> StreamExecutorResult<Option<O>> {
661 assert!(pk.len() <= self.pk_indices.len());
662
663 let serialized_pk =
664 serialize_pk_with_vnode(&pk, &self.pk_serde, self.compute_vnode_by_pk(&pk));
665
666 let prefix_hint = if self.prefix_hint_len != 0 && self.prefix_hint_len == pk.len() {
667 Some(serialized_pk.slice(VirtualNode::SIZE..))
668 } else {
669 #[cfg(debug_assertions)]
670 if self.prefix_hint_len != 0 {
671 warn!(
672 "prefix_hint_len is not equal to pk.len(), may not be able to utilize bloom filter"
673 );
674 }
675 None
676 };
677
678 let read_options = ReadOptions {
679 prefix_hint,
680 retention_seconds: self.table_option.retention_seconds,
681 cache_policy: CachePolicy::Fill(Hint::Normal),
682 ..Default::default()
683 };
684
685 self.local_store
686 .on_key_value(serialized_pk, read_options, on_key_value_fn)
687 .await
688 .map_err(Into::into)
689 }
690}
691
692#[must_use]
707pub struct StateTablePostCommit<
708 'a,
709 S,
710 SD = BasicSerde,
711 const IS_REPLICATED: bool = false,
712 const USE_WATERMARK_CACHE: bool = false,
713> where
714 S: StateStore,
715 SD: ValueRowSerde,
716{
717 inner: &'a mut StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>,
718}
719
720impl<'a, S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
721 StateTablePostCommit<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
722where
723 S: StateStore,
724 SD: ValueRowSerde,
725{
726 pub async fn post_yield_barrier(
727 mut self,
728 new_vnodes: Option<Arc<Bitmap>>,
729 ) -> StreamExecutorResult<
730 Option<(
731 (
732 Arc<Bitmap>,
733 Arc<Bitmap>,
734 &'a mut StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>,
735 ),
736 bool,
737 )>,
738 > {
739 self.inner.on_post_commit = false;
740 Ok(if let Some(new_vnodes) = new_vnodes {
741 let (old_vnodes, cache_may_stale) =
742 self.update_vnode_bitmap(new_vnodes.clone()).await?;
743 Some(((new_vnodes, old_vnodes, self.inner), cache_may_stale))
744 } else {
745 None
746 })
747 }
748
749 pub fn inner(&self) -> &StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE> {
750 &*self.inner
751 }
752
753 async fn update_vnode_bitmap(
755 &mut self,
756 new_vnodes: Arc<Bitmap>,
757 ) -> StreamExecutorResult<(Arc<Bitmap>, bool)> {
758 let prev_vnodes = self
759 .inner
760 .local_store
761 .update_vnode_bitmap(new_vnodes.clone())
762 .await?;
763 assert_eq!(
764 &prev_vnodes,
765 self.inner.vnodes(),
766 "state table and state store vnode bitmap mismatches"
767 );
768
769 if self.inner.distribution.is_singleton() {
770 assert_eq!(
771 &new_vnodes,
772 self.inner.vnodes(),
773 "should not update vnode bitmap for singleton table"
774 );
775 }
776 assert_eq!(self.inner.vnodes().len(), new_vnodes.len());
777
778 let cache_may_stale = cache_may_stale(self.inner.vnodes(), &new_vnodes);
779
780 if cache_may_stale {
781 self.inner.pending_watermark = None;
782 if USE_WATERMARK_CACHE {
783 self.inner.watermark_cache.clear();
784 }
785 }
786
787 Ok((
788 self.inner.distribution.update_vnode_bitmap(new_vnodes),
789 cache_may_stale,
790 ))
791 }
792}
793
794impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
796 StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
797where
798 S: StateStore,
799 SD: ValueRowSerde,
800{
801 fn handle_mem_table_error(&self, e: StorageError) {
802 let e = match e.into_inner() {
803 ErrorKind::MemTable(e) => e,
804 _ => unreachable!("should only get memtable error"),
805 };
806 match *e {
807 MemTableError::InconsistentOperation { key, prev, new } => {
808 let (vnode, key) = deserialize_pk_with_vnode(&key, &self.pk_serde).unwrap();
809 panic!(
810 "mem-table operation inconsistent! table_id: {}, vnode: {}, key: {:?}, prev: {}, new: {}",
811 self.table_id(),
812 vnode,
813 &key,
814 prev.debug_fmt(&*self.row_serde),
815 new.debug_fmt(&*self.row_serde),
816 )
817 }
818 }
819 }
820
821 fn serialize_value(&self, value: impl Row) -> Bytes {
822 if let Some(value_indices) = self.value_indices.as_ref() {
823 self.row_serde
824 .serialize(value.project(value_indices))
825 .into()
826 } else {
827 self.row_serde.serialize(value).into()
828 }
829 }
830
831 fn insert_inner(&mut self, key: TableKey<Bytes>, value_bytes: Bytes) {
832 insane_mode_discard_point!();
833 self.local_store
834 .insert(key, value_bytes, None)
835 .unwrap_or_else(|e| self.handle_mem_table_error(e));
836 }
837
838 fn delete_inner(&mut self, key: TableKey<Bytes>, value_bytes: Bytes) {
839 insane_mode_discard_point!();
840 self.local_store
841 .delete(key, value_bytes)
842 .unwrap_or_else(|e| self.handle_mem_table_error(e));
843 }
844
845 fn update_inner(
846 &mut self,
847 key_bytes: TableKey<Bytes>,
848 old_value_bytes: Option<Bytes>,
849 new_value_bytes: Bytes,
850 ) {
851 insane_mode_discard_point!();
852 self.local_store
853 .insert(key_bytes, new_value_bytes, old_value_bytes)
854 .unwrap_or_else(|e| self.handle_mem_table_error(e));
855 }
856
857 pub fn insert(&mut self, value: impl Row) {
860 let pk_indices = &self.pk_indices;
861 let pk = (&value).project(pk_indices);
862 if USE_WATERMARK_CACHE {
863 self.watermark_cache.insert(&pk);
864 }
865
866 let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_vnode_by_pk(pk));
867 let value_bytes = self.serialize_value(value);
868 self.insert_inner(key_bytes, value_bytes);
869 }
870
871 pub fn delete(&mut self, old_value: impl Row) {
874 let pk_indices = &self.pk_indices;
875 let pk = (&old_value).project(pk_indices);
876 if USE_WATERMARK_CACHE {
877 self.watermark_cache.delete(&pk);
878 }
879
880 let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_vnode_by_pk(pk));
881 let value_bytes = self.serialize_value(old_value);
882 self.delete_inner(key_bytes, value_bytes);
883 }
884
885 pub fn update(&mut self, old_value: impl Row, new_value: impl Row) {
887 let old_pk = (&old_value).project(self.pk_indices());
888 let new_pk = (&new_value).project(self.pk_indices());
889 debug_assert!(
890 Row::eq(&old_pk, new_pk),
891 "pk should not change: {old_pk:?} vs {new_pk:?}",
892 );
893
894 let new_key_bytes =
895 serialize_pk_with_vnode(new_pk, &self.pk_serde, self.compute_vnode_by_pk(new_pk));
896 let old_value_bytes = self.serialize_value(old_value);
897 let new_value_bytes = self.serialize_value(new_value);
898
899 self.update_inner(new_key_bytes, Some(old_value_bytes), new_value_bytes);
900 }
901
902 pub fn write_record(&mut self, record: Record<impl Row>) {
904 match record {
905 Record::Insert { new_row } => self.insert(new_row),
906 Record::Delete { old_row } => self.delete(old_row),
907 Record::Update { old_row, new_row } => self.update(old_row, new_row),
908 }
909 }
910
911 fn fill_non_output_indices(&self, chunk: StreamChunk) -> StreamChunk {
912 fill_non_output_indices(&self.i2o_mapping, &self.data_types, chunk)
913 }
914
915 #[allow(clippy::disallowed_methods)]
918 pub fn write_chunk(&mut self, chunk: StreamChunk) {
919 let chunk = if IS_REPLICATED {
920 self.fill_non_output_indices(chunk)
921 } else {
922 chunk
923 };
924 let (chunk, op) = chunk.into_parts();
925
926 let vnodes = self
927 .distribution
928 .compute_chunk_vnode(&chunk, &self.pk_indices);
929
930 let values = if let Some(ref value_indices) = self.value_indices {
931 chunk
932 .project(value_indices)
933 .serialize_with(&*self.row_serde)
934 } else {
935 chunk.serialize_with(&*self.row_serde)
936 };
937
938 let key_chunk = chunk.project(self.pk_indices());
943 let vnode_and_pks = key_chunk
944 .rows_with_holes()
945 .zip_eq_fast(vnodes.iter())
946 .map(|(r, vnode)| {
947 let mut buffer = BytesMut::new();
948 buffer.put_slice(&vnode.to_be_bytes()[..]);
949 if let Some(r) = r {
950 self.pk_serde.serialize(r, &mut buffer);
951 }
952 (r, buffer.freeze())
953 })
954 .collect_vec();
955
956 if !key_chunk.is_compacted() {
957 for ((op, (key, key_bytes), value), vis) in
958 izip!(op.iter(), vnode_and_pks, values).zip_eq_debug(key_chunk.visibility().iter())
959 {
960 if vis {
961 match op {
962 Op::Insert | Op::UpdateInsert => {
963 if USE_WATERMARK_CACHE && let Some(ref pk) = key {
964 self.watermark_cache.insert(pk);
965 }
966 self.insert_inner(TableKey(key_bytes), value);
967 }
968 Op::Delete | Op::UpdateDelete => {
969 if USE_WATERMARK_CACHE && let Some(ref pk) = key {
970 self.watermark_cache.delete(pk);
971 }
972 self.delete_inner(TableKey(key_bytes), value);
973 }
974 }
975 }
976 }
977 } else {
978 for (op, (key, key_bytes), value) in izip!(op.iter(), vnode_and_pks, values) {
979 match op {
980 Op::Insert | Op::UpdateInsert => {
981 if USE_WATERMARK_CACHE && let Some(ref pk) = key {
982 self.watermark_cache.insert(pk);
983 }
984 self.insert_inner(TableKey(key_bytes), value);
985 }
986 Op::Delete | Op::UpdateDelete => {
987 if USE_WATERMARK_CACHE && let Some(ref pk) = key {
988 self.watermark_cache.delete(pk);
989 }
990 self.delete_inner(TableKey(key_bytes), value);
991 }
992 }
993 }
994 }
995 }
996
997 pub fn update_watermark(&mut self, watermark: ScalarImpl) {
1003 trace!(table_id = %self.table_id, watermark = ?watermark, "update watermark");
1004 self.pending_watermark = Some(watermark);
1005 }
1006
1007 pub fn get_committed_watermark(&self) -> Option<&ScalarImpl> {
1010 self.committed_watermark.as_ref()
1011 }
1012
1013 pub async fn commit(
1014 &mut self,
1015 new_epoch: EpochPair,
1016 ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1017 {
1018 self.commit_inner(new_epoch, None).await
1019 }
1020
1021 #[cfg(test)]
1022 pub async fn commit_for_test(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
1023 self.commit_assert_no_update_vnode_bitmap(new_epoch).await
1024 }
1025
1026 pub async fn commit_assert_no_update_vnode_bitmap(
1027 &mut self,
1028 new_epoch: EpochPair,
1029 ) -> StreamExecutorResult<()> {
1030 let post_commit = self.commit_inner(new_epoch, None).await?;
1031 post_commit.post_yield_barrier(None).await?;
1032 Ok(())
1033 }
1034
1035 pub async fn commit_may_switch_consistent_op(
1036 &mut self,
1037 new_epoch: EpochPair,
1038 op_consistency_level: StateTableOpConsistencyLevel,
1039 ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1040 {
1041 if self.op_consistency_level != op_consistency_level {
1042 info!(
1043 ?new_epoch,
1044 prev_op_consistency_level = ?self.op_consistency_level,
1045 ?op_consistency_level,
1046 table_id = self.table_id.table_id,
1047 "switch to new op consistency level"
1048 );
1049 self.commit_inner(new_epoch, Some(op_consistency_level))
1050 .await
1051 } else {
1052 self.commit_inner(new_epoch, None).await
1053 }
1054 }
1055
1056 async fn commit_inner(
1057 &mut self,
1058 new_epoch: EpochPair,
1059 switch_consistent_op: Option<StateTableOpConsistencyLevel>,
1060 ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1061 {
1062 assert!(!self.on_post_commit);
1063 assert_eq!(
1064 self.epoch.expect("should only be called after init").curr,
1065 new_epoch.prev
1066 );
1067 let switch_op_consistency_level = switch_consistent_op.map(|new_consistency_level| {
1068 assert_ne!(self.op_consistency_level, new_consistency_level);
1069 self.op_consistency_level = new_consistency_level;
1070 match new_consistency_level {
1071 StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
1072 StateTableOpConsistencyLevel::ConsistentOldValue => {
1073 consistent_old_value_op(self.row_serde.clone(), false)
1074 }
1075 StateTableOpConsistencyLevel::LogStoreEnabled => {
1076 consistent_old_value_op(self.row_serde.clone(), true)
1077 }
1078 }
1079 });
1080 trace!(
1081 table_id = %self.table_id,
1082 epoch = ?self.epoch,
1083 "commit state table"
1084 );
1085
1086 self.local_store
1087 .flush()
1088 .instrument(tracing::info_span!("state_table_flush"))
1089 .await?;
1090 let table_watermarks = self.commit_pending_watermark();
1091 self.local_store.seal_current_epoch(
1092 new_epoch.curr,
1093 SealCurrentEpochOptions {
1094 table_watermarks,
1095 switch_op_consistency_level,
1096 },
1097 );
1098 self.epoch = Some(new_epoch);
1099
1100 if USE_WATERMARK_CACHE
1102 && !self.watermark_cache.is_synced()
1103 && let Some(ref watermark) = self.committed_watermark
1104 {
1105 let range: (Bound<Once<Datum>>, Bound<Once<Datum>>) =
1106 (Included(once(Some(watermark.clone()))), Unbounded);
1107 let mut pks = Vec::with_capacity(self.watermark_cache.capacity());
1116 {
1117 let mut streams = vec![];
1118 for vnode in self.vnodes().iter_vnodes() {
1119 let stream = self
1120 .iter_keyed_row_with_vnode(vnode, &range, PrefetchOptions::default())
1121 .await?;
1122 streams.push(Box::pin(stream));
1123 }
1124 let merged_stream = merge_sort(streams);
1125 pin_mut!(merged_stream);
1126
1127 #[for_await]
1128 for entry in merged_stream.take(self.watermark_cache.capacity()) {
1129 let keyed_row = entry?;
1130 let pk = self.pk_serde.deserialize(keyed_row.key())?;
1131 if !pk.is_null_at(self.clean_watermark_index_in_pk.unwrap_or(0) as usize) {
1133 pks.push(pk);
1134 }
1135 }
1136 }
1137
1138 let mut filler = self.watermark_cache.begin_syncing();
1139 for pk in pks {
1140 filler.insert_unchecked(DefaultOrdered(pk), ());
1141 }
1142 filler.finish();
1143
1144 let n_cache_entries = self.watermark_cache.len();
1145 if n_cache_entries < self.watermark_cache.capacity() {
1146 self.watermark_cache.set_table_row_count(n_cache_entries);
1147 }
1148 }
1149
1150 self.on_post_commit = true;
1151 Ok(StateTablePostCommit { inner: self })
1152 }
1153
1154 fn commit_pending_watermark(
1156 &mut self,
1157 ) -> Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)> {
1158 let watermark = self.pending_watermark.take()?;
1159 trace!(table_id = %self.table_id, watermark = ?watermark, "state cleaning");
1160
1161 assert!(
1162 !self.pk_indices().is_empty(),
1163 "see pending watermark on empty pk"
1164 );
1165 let watermark_serializer = {
1166 match self.clean_watermark_index_in_pk {
1167 None => self.pk_serde.index(0),
1168 Some(clean_watermark_index_in_pk) => {
1169 self.pk_serde.index(clean_watermark_index_in_pk as usize)
1170 }
1171 }
1172 };
1173
1174 let watermark_type = match self.clean_watermark_index_in_pk {
1175 None => WatermarkSerdeType::PkPrefix,
1176 Some(clean_watermark_index_in_pk) => match clean_watermark_index_in_pk {
1177 0 => WatermarkSerdeType::PkPrefix,
1178 _ => WatermarkSerdeType::NonPkPrefix,
1179 },
1180 };
1181
1182 let should_clean_watermark = {
1183 {
1184 if USE_WATERMARK_CACHE && self.watermark_cache.is_synced() {
1185 if let Some(key) = self.watermark_cache.lowest_key() {
1186 watermark.as_scalar_ref_impl().default_cmp(&key).is_ge()
1187 } else {
1188 false
1193 }
1194 } else {
1195 true
1199 }
1200 }
1201 };
1202
1203 let watermark_suffix =
1204 serialize_pk(row::once(Some(watermark.clone())), &watermark_serializer);
1205
1206 let seal_watermark = if should_clean_watermark {
1208 trace!(table_id = %self.table_id, watermark = ?watermark_suffix, vnodes = ?{
1209 self.vnodes().iter_vnodes().collect_vec()
1210 }, "delete range");
1211
1212 let order_type = watermark_serializer.get_order_types().get(0).unwrap();
1213
1214 if order_type.is_ascending() {
1215 Some((
1216 WatermarkDirection::Ascending,
1217 VnodeWatermark::new(
1218 self.vnodes().clone(),
1219 Bytes::copy_from_slice(watermark_suffix.as_ref()),
1220 ),
1221 watermark_type,
1222 ))
1223 } else {
1224 Some((
1225 WatermarkDirection::Descending,
1226 VnodeWatermark::new(
1227 self.vnodes().clone(),
1228 Bytes::copy_from_slice(watermark_suffix.as_ref()),
1229 ),
1230 watermark_type,
1231 ))
1232 }
1233 } else {
1234 None
1235 };
1236 self.committed_watermark = Some(watermark);
1237
1238 if USE_WATERMARK_CACHE && seal_watermark.is_some() {
1246 self.watermark_cache.clear();
1247 }
1248
1249 seal_watermark.map(|(direction, watermark, is_non_pk_prefix)| {
1250 (direction, vec![watermark], is_non_pk_prefix)
1251 })
1252 }
1253
1254 pub async fn try_flush(&mut self) -> StreamExecutorResult<()> {
1255 self.local_store.try_flush().await?;
1256 Ok(())
1257 }
1258}
1259
1260pub trait RowStream<'a> = Stream<Item = StreamExecutorResult<OwnedRow>> + 'a;
1261pub trait KeyedRowStream<'a> = Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a;
1262pub trait PkRowStream<'a, K> = Stream<Item = StreamExecutorResult<(K, OwnedRow)>> + 'a;
1263
1264impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1266 StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1267where
1268 S: StateStore,
1269 SD: ValueRowSerde,
1270{
1271 pub async fn iter_with_vnode(
1274 &self,
1275
1276 vnode: VirtualNode,
1280 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1281 prefetch_options: PrefetchOptions,
1282 ) -> StreamExecutorResult<impl RowStream<'_>> {
1283 Ok(deserialize_keyed_row_stream::<'_, ()>(
1284 self.iter_kv_with_pk_range(pk_range, vnode, prefetch_options)
1285 .await?,
1286 &*self.row_serde,
1287 )
1288 .map_ok(|(_, row)| row))
1289 }
1290
1291 pub async fn iter_keyed_row_with_vnode(
1292 &self,
1293 vnode: VirtualNode,
1294 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1295 prefetch_options: PrefetchOptions,
1296 ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1297 Ok(deserialize_keyed_row_stream(
1298 self.iter_kv_with_pk_range(pk_range, vnode, prefetch_options)
1299 .await?,
1300 &*self.row_serde,
1301 )
1302 .map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)))
1303 }
1304
1305 pub async fn iter_with_vnode_and_output_indices(
1306 &self,
1307 vnode: VirtualNode,
1308 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1309 prefetch_options: PrefetchOptions,
1310 ) -> StreamExecutorResult<impl RowStream<'_>> {
1311 assert!(IS_REPLICATED);
1312 let stream = self
1313 .iter_with_vnode(vnode, pk_range, prefetch_options)
1314 .await?;
1315 Ok(stream.map(|row| row.map(|row| row.project(&self.output_indices).into_owned_row())))
1316 }
1317
1318 async fn iter_kv(
1319 &self,
1320 table_key_range: TableKeyRange,
1321 prefix_hint: Option<Bytes>,
1322 prefetch_options: PrefetchOptions,
1323 ) -> StreamExecutorResult<<S::Local as LocalStateStore>::Iter<'_>> {
1324 let read_options = ReadOptions {
1325 prefix_hint,
1326 retention_seconds: self.table_option.retention_seconds,
1327 prefetch_options,
1328 cache_policy: CachePolicy::Fill(Hint::Normal),
1329 };
1330
1331 Ok(self.local_store.iter(table_key_range, read_options).await?)
1332 }
1333
1334 async fn rev_iter_kv(
1335 &self,
1336 table_key_range: TableKeyRange,
1337 prefix_hint: Option<Bytes>,
1338 prefetch_options: PrefetchOptions,
1339 ) -> StreamExecutorResult<<S::Local as LocalStateStore>::RevIter<'_>> {
1340 let read_options = ReadOptions {
1341 prefix_hint,
1342 retention_seconds: self.table_option.retention_seconds,
1343 prefetch_options,
1344 cache_policy: CachePolicy::Fill(Hint::Normal),
1345 };
1346
1347 Ok(self
1348 .local_store
1349 .rev_iter(table_key_range, read_options)
1350 .await?)
1351 }
1352
1353 pub async fn iter_with_prefix(
1357 &self,
1358 pk_prefix: impl Row,
1359 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1360 prefetch_options: PrefetchOptions,
1361 ) -> StreamExecutorResult<impl RowStream<'_>> {
1362 let stream = self.iter_with_prefix_inner::<false, ()>(pk_prefix, sub_range, prefetch_options)
1363 .await?;
1364 Ok(stream.map_ok(|(_, row)| row))
1365 }
1366
1367 pub async fn get_from_one_row_table(&self) -> StreamExecutorResult<Option<OwnedRow>> {
1369 let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
1370 let stream = self
1371 .iter_with_prefix(row::empty(), sub_range, Default::default())
1372 .await?;
1373 pin_mut!(stream);
1374
1375 if let Some(res) = stream.next().await {
1376 let value = res?.into_owned_row();
1377 assert!(stream.next().await.is_none());
1378 Ok(Some(value))
1379 } else {
1380 Ok(None)
1381 }
1382 }
1383
1384 pub async fn get_from_one_value_table(&self) -> StreamExecutorResult<Option<ScalarImpl>> {
1389 Ok(self
1390 .get_from_one_row_table()
1391 .await?
1392 .and_then(|row| row[0].clone()))
1393 }
1394
1395 pub async fn iter_keyed_row_with_prefix(
1396 &self,
1397 pk_prefix: impl Row,
1398 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1399 prefetch_options: PrefetchOptions,
1400 ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1401 Ok(
1402 self.iter_with_prefix_inner::<false, Bytes>(pk_prefix, sub_range, prefetch_options)
1403 .await?.map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)),
1404 )
1405 }
1406
1407 pub async fn rev_iter_with_prefix(
1409 &self,
1410 pk_prefix: impl Row,
1411 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1412 prefetch_options: PrefetchOptions,
1413 ) -> StreamExecutorResult<impl RowStream<'_>> {
1414 Ok(
1415 self.iter_with_prefix_inner::<true, ()>(pk_prefix, sub_range, prefetch_options)
1416 .await?.map_ok(|(_, row)| row),
1417 )
1418 }
1419
1420 async fn iter_with_prefix_inner<const REVERSE: bool, K: CopyFromSlice>(
1421 &self,
1422 pk_prefix: impl Row,
1423 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1424 prefetch_options: PrefetchOptions,
1425 ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1426 let prefix_serializer = self.pk_serde.prefix(pk_prefix.len());
1427 let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
1428
1429 let vnode = self.compute_prefix_vnode(&pk_prefix);
1433
1434 let pk_prefix_indices = &self.pk_indices[..pk_prefix.len()];
1436 if self.prefix_hint_len != 0 {
1437 debug_assert_eq!(self.prefix_hint_len, pk_prefix.len());
1438 }
1439 let prefix_hint = {
1440 if self.prefix_hint_len == 0 || self.prefix_hint_len > pk_prefix.len() {
1441 None
1442 } else {
1443 let encoded_prefix_len = self
1444 .pk_serde
1445 .deserialize_prefix_len(&encoded_prefix, self.prefix_hint_len)?;
1446
1447 Some(Bytes::copy_from_slice(
1448 &encoded_prefix[..encoded_prefix_len],
1449 ))
1450 }
1451 };
1452
1453 trace!(
1454 table_id = %self.table_id(),
1455 ?prefix_hint, ?pk_prefix,
1456 ?pk_prefix_indices,
1457 iter_direction = if REVERSE { "reverse" } else { "forward" },
1458 "storage_iter_with_prefix"
1459 );
1460
1461 let memcomparable_range =
1462 prefix_and_sub_range_to_memcomparable(&self.pk_serde, sub_range, pk_prefix);
1463
1464 let memcomparable_range_with_vnode = prefixed_range_with_vnode(memcomparable_range, vnode);
1465
1466 Ok(if REVERSE {
1467 futures::future::Either::Left(deserialize_keyed_row_stream(
1468 self.rev_iter_kv(
1469 memcomparable_range_with_vnode,
1470 prefix_hint,
1471 prefetch_options,
1472 )
1473 .await?,
1474 &*self.row_serde,
1475 ))
1476 } else {
1477 futures::future::Either::Right(deserialize_keyed_row_stream(
1478 self.iter_kv(
1479 memcomparable_range_with_vnode,
1480 prefix_hint,
1481 prefetch_options,
1482 )
1483 .await?,
1484 &*self.row_serde,
1485 ))
1486 })
1487 }
1488
1489 async fn iter_kv_with_pk_range(
1492 &self,
1493 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1494 vnode: VirtualNode,
1498 prefetch_options: PrefetchOptions,
1499 ) -> StreamExecutorResult<<S::Local as LocalStateStore>::Iter<'_>> {
1500 let memcomparable_range = prefix_range_to_memcomparable(&self.pk_serde, pk_range);
1501 let memcomparable_range_with_vnode = prefixed_range_with_vnode(memcomparable_range, vnode);
1502
1503 self.iter_kv(memcomparable_range_with_vnode, None, prefetch_options)
1505 .await
1506 }
1507
1508 #[cfg(test)]
1509 pub fn get_watermark_cache(&self) -> &StateTableWatermarkCache {
1510 &self.watermark_cache
1511 }
1512}
1513
1514impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1515 StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1516where
1517 S: StateStore,
1518 SD: ValueRowSerde,
1519{
1520 pub async fn iter_log_with_vnode(
1521 &self,
1522 vnode: VirtualNode,
1523 epoch_range: (u64, u64),
1524 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1525 ) -> StreamExecutorResult<impl Stream<Item = StreamExecutorResult<ChangeLogRow>> + '_> {
1526 let memcomparable_range = prefix_range_to_memcomparable(&self.pk_serde, pk_range);
1527 let memcomparable_range_with_vnode = prefixed_range_with_vnode(memcomparable_range, vnode);
1528 Ok(deserialize_log_stream(
1529 self.store
1530 .iter_log(
1531 epoch_range,
1532 memcomparable_range_with_vnode,
1533 ReadLogOptions {
1534 table_id: self.table_id,
1535 },
1536 )
1537 .await?,
1538 &*self.row_serde,
1539 )
1540 .map_err(Into::into))
1541 }
1542}
1543
1544fn deserialize_keyed_row_stream<'a, K: CopyFromSlice>(
1545 iter: impl StateStoreIter + 'a,
1546 deserializer: &'a impl ValueRowSerde,
1547) -> impl PkRowStream<'a, K> {
1548 iter.into_stream(move |(key, value)| {
1549 Ok((
1550 K::copy_from_slice(key.user_key.table_key.as_ref()),
1551 deserializer.deserialize(value).map(OwnedRow::new)?,
1552 ))
1553 })
1554 .map_err(Into::into)
1555}
1556
1557pub fn prefix_range_to_memcomparable(
1558 pk_serde: &OrderedRowSerde,
1559 range: &(Bound<impl Row>, Bound<impl Row>),
1560) -> (Bound<Bytes>, Bound<Bytes>) {
1561 (
1562 start_range_to_memcomparable(pk_serde, &range.0),
1563 end_range_to_memcomparable(pk_serde, &range.1, None),
1564 )
1565}
1566
1567fn prefix_and_sub_range_to_memcomparable(
1568 pk_serde: &OrderedRowSerde,
1569 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1570 pk_prefix: impl Row,
1571) -> (Bound<Bytes>, Bound<Bytes>) {
1572 let (range_start, range_end) = sub_range;
1573 let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1574 let serialized_pk_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
1575 let start_range = match range_start {
1576 Included(start_range) => Bound::Included(Either::Left((&pk_prefix).chain(start_range))),
1577 Excluded(start_range) => Bound::Excluded(Either::Left((&pk_prefix).chain(start_range))),
1578 Unbounded => Bound::Included(Either::Right(&pk_prefix)),
1579 };
1580 let end_range = match range_end {
1581 Included(end_range) => Bound::Included((&pk_prefix).chain(end_range)),
1582 Excluded(end_range) => Bound::Excluded((&pk_prefix).chain(end_range)),
1583 Unbounded => Unbounded,
1584 };
1585 (
1586 start_range_to_memcomparable(pk_serde, &start_range),
1587 end_range_to_memcomparable(pk_serde, &end_range, Some(serialized_pk_prefix)),
1588 )
1589}
1590
1591fn start_range_to_memcomparable<R: Row>(
1592 pk_serde: &OrderedRowSerde,
1593 bound: &Bound<R>,
1594) -> Bound<Bytes> {
1595 let serialize_pk_prefix = |pk_prefix: &R| {
1596 let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1597 serialize_pk(pk_prefix, &prefix_serializer)
1598 };
1599 match bound {
1600 Unbounded => Unbounded,
1601 Included(r) => {
1602 let serialized = serialize_pk_prefix(r);
1603
1604 Included(serialized)
1605 }
1606 Excluded(r) => {
1607 let serialized = serialize_pk_prefix(r);
1608
1609 start_bound_of_excluded_prefix(&serialized)
1610 }
1611 }
1612}
1613
1614fn end_range_to_memcomparable<R: Row>(
1615 pk_serde: &OrderedRowSerde,
1616 bound: &Bound<R>,
1617 serialized_pk_prefix: Option<Bytes>,
1618) -> Bound<Bytes> {
1619 let serialize_pk_prefix = |pk_prefix: &R| {
1620 let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1621 serialize_pk(pk_prefix, &prefix_serializer)
1622 };
1623 match bound {
1624 Unbounded => match serialized_pk_prefix {
1625 Some(serialized_pk_prefix) => end_bound_of_prefix(&serialized_pk_prefix),
1626 None => Unbounded,
1627 },
1628 Included(r) => {
1629 let serialized = serialize_pk_prefix(r);
1630
1631 end_bound_of_prefix(&serialized)
1632 }
1633 Excluded(r) => {
1634 let serialized = serialize_pk_prefix(r);
1635 Excluded(serialized)
1636 }
1637 }
1638}
1639
1640fn fill_non_output_indices(
1641 i2o_mapping: &ColIndexMapping,
1642 data_types: &[DataType],
1643 chunk: StreamChunk,
1644) -> StreamChunk {
1645 let cardinality = chunk.cardinality();
1646 let (ops, columns, vis) = chunk.into_inner();
1647 let mut full_columns = Vec::with_capacity(data_types.len());
1648 for (i, data_type) in data_types.iter().enumerate() {
1649 if let Some(j) = i2o_mapping.try_map(i) {
1650 full_columns.push(columns[j].clone());
1651 } else {
1652 let mut column_builder = ArrayImplBuilder::with_type(cardinality, data_type.clone());
1653 column_builder.append_n_null(cardinality);
1654 let column: ArrayRef = column_builder.finish().into();
1655 full_columns.push(column)
1656 }
1657 }
1658 let data_chunk = DataChunk::new(full_columns, vis);
1659 StreamChunk::from_parts(ops, data_chunk)
1660}
1661
1662#[cfg(test)]
1663mod tests {
1664 use std::fmt::Debug;
1665
1666 use expect_test::{Expect, expect};
1667
1668 use super::*;
1669
1670 fn check(actual: impl Debug, expect: Expect) {
1671 let actual = format!("{:#?}", actual);
1672 expect.assert_eq(&actual);
1673 }
1674
1675 #[test]
1676 fn test_fill_non_output_indices() {
1677 let data_types = vec![DataType::Int32, DataType::Int32, DataType::Int32];
1678 let replicated_chunk = [OwnedRow::new(vec![
1679 Some(222_i32.into()),
1680 Some(2_i32.into()),
1681 ])];
1682 let replicated_chunk = StreamChunk::from_parts(
1683 vec![Op::Insert],
1684 DataChunk::from_rows(&replicated_chunk, &[DataType::Int32, DataType::Int32]),
1685 );
1686 let i2o_mapping = ColIndexMapping::new(vec![Some(1), None, Some(0)], 2);
1687 let filled_chunk = fill_non_output_indices(&i2o_mapping, &data_types, replicated_chunk);
1688 check(
1689 filled_chunk,
1690 expect![[r#"
1691 StreamChunk { cardinality: 1, capacity: 1, data:
1692 +---+---+---+-----+
1693 | + | 2 | | 222 |
1694 +---+---+---+-----+
1695 }"#]],
1696 );
1697 }
1698}