1use std::collections::HashMap;
16use std::ops::Bound;
17use std::ops::Bound::*;
18use std::sync::Arc;
19
20use bytes::{BufMut, Bytes, BytesMut};
21use either::Either;
22use foyer::Hint;
23use futures::{Stream, StreamExt, TryStreamExt, pin_mut};
24use futures_async_stream::for_await;
25use itertools::{Itertools, izip};
26use risingwave_common::array::stream_record::Record;
27use risingwave_common::array::{ArrayImplBuilder, ArrayRef, DataChunk, Op, StreamChunk};
28use risingwave_common::bitmap::Bitmap;
29use risingwave_common::catalog::{
30 ColumnDesc, ColumnId, TableId, TableOption, get_dist_key_in_pk_indices,
31};
32use risingwave_common::hash::{VirtualNode, VnodeBitmapExt, VnodeCountCompat};
33use risingwave_common::row::{self, Once, OwnedRow, Row, RowExt, once};
34use risingwave_common::types::{DataType, Datum, DefaultOrd, DefaultOrdered, ScalarImpl};
35use risingwave_common::util::column_index_mapping::ColIndexMapping;
36use risingwave_common::util::epoch::EpochPair;
37use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
38use risingwave_common::util::row_serde::OrderedRowSerde;
39use risingwave_common::util::sort_util::OrderType;
40use risingwave_common::util::value_encoding::BasicSerde;
41use risingwave_hummock_sdk::HummockReadEpoch;
42use risingwave_hummock_sdk::key::{
43 CopyFromSlice, TableKey, TableKeyRange, end_bound_of_prefix, prefixed_range_with_vnode,
44 start_bound_of_excluded_prefix,
45};
46use risingwave_hummock_sdk::table_watermark::{
47 VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
48};
49use risingwave_pb::catalog::Table;
50use risingwave_storage::StateStore;
51use risingwave_storage::error::{ErrorKind, StorageError, StorageResult};
52use risingwave_storage::hummock::CachePolicy;
53use risingwave_storage::mem_table::MemTableError;
54use risingwave_storage::row_serde::find_columns_by_ids;
55use risingwave_storage::row_serde::row_serde_util::{
56 deserialize_pk_with_vnode, serialize_pk, serialize_pk_with_vnode,
57};
58use risingwave_storage::row_serde::value_serde::ValueRowSerde;
59use risingwave_storage::store::*;
60use risingwave_storage::table::merge_sort::merge_sort;
61use risingwave_storage::table::{
62 ChangeLogRow, KeyedRow, TableDistribution, deserialize_log_stream,
63};
64use thiserror_ext::AsReport;
65use tracing::{Instrument, trace};
66
67use crate::cache::cache_may_stale;
68use crate::common::state_cache::{StateCache, StateCacheFiller};
69use crate::common::table::state_table_cache::StateTableWatermarkCache;
70use crate::executor::StreamExecutorResult;
71
72const WATERMARK_CACHE_ENTRIES: usize = 16;
76
77macro_rules! insane_mode_discard_point {
80 () => {{
81 use rand::Rng;
82 if crate::consistency::insane() && rand::rng().random_bool(0.3) {
83 return;
84 }
85 }};
86}
87
88#[derive(Clone)]
91pub struct StateTableInner<
92 S,
93 SD = BasicSerde,
94 const IS_REPLICATED: bool = false,
95 const USE_WATERMARK_CACHE: bool = false,
96> where
97 S: StateStore,
98 SD: ValueRowSerde,
99{
100 table_id: TableId,
102
103 local_store: S::Local,
105
106 store: S,
108
109 epoch: Option<EpochPair>,
111
112 pk_serde: OrderedRowSerde,
114
115 row_serde: Arc<SD>,
117
118 pk_indices: Vec<usize>,
122
123 distribution: TableDistribution,
129
130 prefix_hint_len: usize,
131
132 table_option: TableOption,
134
135 value_indices: Option<Vec<usize>>,
136
137 pending_watermark: Option<ScalarImpl>,
139 committed_watermark: Option<ScalarImpl>,
141 watermark_cache: StateTableWatermarkCache,
143
144 data_types: Vec<DataType>,
147
148 i2o_mapping: ColIndexMapping,
154
155 output_indices: Vec<usize>,
160
161 op_consistency_level: StateTableOpConsistencyLevel,
162
163 clean_watermark_index_in_pk: Option<i32>,
164
165 on_post_commit: bool,
168}
169
170pub type StateTable<S> = StateTableInner<S, BasicSerde>;
172pub type ReplicatedStateTable<S, SD> = StateTableInner<S, SD, true>;
175pub type WatermarkCacheStateTable<S> = StateTableInner<S, BasicSerde, false, true>;
178pub type WatermarkCacheParameterizedStateTable<S, const USE_WATERMARK_CACHE: bool> =
179 StateTableInner<S, BasicSerde, false, USE_WATERMARK_CACHE>;
180
181impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
183 StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
184where
185 S: StateStore,
186 SD: ValueRowSerde,
187{
188 pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
191 self.local_store.init(InitOptions::new(epoch)).await?;
192 assert_eq!(None, self.epoch.replace(epoch), "should not init for twice");
193 Ok(())
194 }
195
196 pub async fn try_wait_committed_epoch(&self, prev_epoch: u64) -> StorageResult<()> {
197 self.store
198 .try_wait_epoch(
199 HummockReadEpoch::Committed(prev_epoch),
200 TryWaitEpochOptions {
201 table_id: self.table_id,
202 },
203 )
204 .await
205 }
206
207 pub fn state_store(&self) -> &S {
208 &self.store
209 }
210}
211
212fn consistent_old_value_op(
213 row_serde: Arc<impl ValueRowSerde>,
214 is_log_store: bool,
215) -> OpConsistencyLevel {
216 OpConsistencyLevel::ConsistentOldValue {
217 check_old_value: Arc::new(move |first: &Bytes, second: &Bytes| {
218 if first == second {
219 return true;
220 }
221 let first = match row_serde.deserialize(first) {
222 Ok(rows) => rows,
223 Err(e) => {
224 error!(error = %e.as_report(), value = ?first, "fail to deserialize serialized value");
225 return false;
226 }
227 };
228 let second = match row_serde.deserialize(second) {
229 Ok(rows) => rows,
230 Err(e) => {
231 error!(error = %e.as_report(), value = ?second, "fail to deserialize serialized value");
232 return false;
233 }
234 };
235 if first != second {
236 error!(first = ?first, second = ?second, "sanity check fail");
237 false
238 } else {
239 true
240 }
241 }),
242 is_log_store,
243 }
244}
245
246#[derive(Eq, PartialEq, Copy, Clone, Debug)]
247pub enum StateTableOpConsistencyLevel {
248 Inconsistent,
250 ConsistentOldValue,
254 LogStoreEnabled,
257}
258
259impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
264 StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
265where
266 S: StateStore,
267 SD: ValueRowSerde,
268{
269 pub async fn from_table_catalog(
273 table_catalog: &Table,
274 store: S,
275 vnodes: Option<Arc<Bitmap>>,
276 ) -> Self {
277 Self::from_table_catalog_with_consistency_level(
278 table_catalog,
279 store,
280 vnodes,
281 StateTableOpConsistencyLevel::ConsistentOldValue,
282 )
283 .await
284 }
285
286 pub async fn from_table_catalog_inconsistent_op(
288 table_catalog: &Table,
289 store: S,
290 vnodes: Option<Arc<Bitmap>>,
291 ) -> Self {
292 Self::from_table_catalog_with_consistency_level(
293 table_catalog,
294 store,
295 vnodes,
296 StateTableOpConsistencyLevel::Inconsistent,
297 )
298 .await
299 }
300
301 pub async fn from_table_catalog_with_consistency_level(
302 table_catalog: &Table,
303 store: S,
304 vnodes: Option<Arc<Bitmap>>,
305 consistency_level: StateTableOpConsistencyLevel,
306 ) -> Self {
307 Self::from_table_catalog_inner(table_catalog, store, vnodes, consistency_level, vec![])
308 .await
309 }
310
311 async fn from_table_catalog_inner(
313 table_catalog: &Table,
314 store: S,
315 vnodes: Option<Arc<Bitmap>>,
316 op_consistency_level: StateTableOpConsistencyLevel,
317 output_column_ids: Vec<ColumnId>,
318 ) -> Self {
319 let table_id = TableId::new(table_catalog.id);
320 let table_columns: Vec<ColumnDesc> = table_catalog
321 .columns
322 .iter()
323 .map(|col| col.column_desc.as_ref().unwrap().into())
324 .collect();
325 let data_types: Vec<DataType> = table_catalog
326 .columns
327 .iter()
328 .map(|col| {
329 col.get_column_desc()
330 .unwrap()
331 .get_column_type()
332 .unwrap()
333 .into()
334 })
335 .collect();
336 let order_types: Vec<OrderType> = table_catalog
337 .pk
338 .iter()
339 .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
340 .collect();
341 let dist_key_indices: Vec<usize> = table_catalog
342 .distribution_key
343 .iter()
344 .map(|dist_index| *dist_index as usize)
345 .collect();
346
347 let pk_indices = table_catalog
348 .pk
349 .iter()
350 .map(|col_order| col_order.column_index as usize)
351 .collect_vec();
352
353 let dist_key_in_pk_indices = if table_catalog.get_dist_key_in_pk().is_empty() {
355 get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices).unwrap()
356 } else {
357 table_catalog
358 .get_dist_key_in_pk()
359 .iter()
360 .map(|idx| *idx as usize)
361 .collect()
362 };
363
364 let vnode_col_idx_in_pk = table_catalog.vnode_col_index.as_ref().and_then(|idx| {
365 let vnode_col_idx = *idx as usize;
366 pk_indices.iter().position(|&i| vnode_col_idx == i)
367 });
368
369 let distribution =
370 TableDistribution::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk);
371 assert_eq!(
372 distribution.vnode_count(),
373 table_catalog.vnode_count(),
374 "vnode count mismatch, scanning table {} under wrong distribution?",
375 table_catalog.name,
376 );
377
378 let pk_data_types = pk_indices
379 .iter()
380 .map(|i| table_columns[*i].data_type.clone())
381 .collect();
382 let pk_serde = OrderedRowSerde::new(pk_data_types, order_types);
383
384 let input_value_indices = table_catalog
385 .value_indices
386 .iter()
387 .map(|val| *val as usize)
388 .collect_vec();
389
390 let no_shuffle_value_indices = (0..table_columns.len()).collect_vec();
391
392 let value_indices = match input_value_indices.len() == table_columns.len()
394 && input_value_indices == no_shuffle_value_indices
395 {
396 true => None,
397 false => Some(input_value_indices),
398 };
399 let prefix_hint_len = table_catalog.read_prefix_len_hint as usize;
400
401 let row_serde = Arc::new(SD::new(
402 Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
403 Arc::from(table_columns.clone().into_boxed_slice()),
404 ));
405
406 let state_table_op_consistency_level = op_consistency_level;
407 let op_consistency_level = match op_consistency_level {
408 StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
409 StateTableOpConsistencyLevel::ConsistentOldValue => {
410 consistent_old_value_op(row_serde.clone(), false)
411 }
412 StateTableOpConsistencyLevel::LogStoreEnabled => {
413 consistent_old_value_op(row_serde.clone(), true)
414 }
415 };
416
417 let table_option = TableOption::new(table_catalog.retention_seconds);
418 let new_local_options = if IS_REPLICATED {
419 NewLocalOptions::new_replicated(
420 table_id,
421 op_consistency_level,
422 table_option,
423 distribution.vnodes().clone(),
424 )
425 } else {
426 NewLocalOptions::new(
427 table_id,
428 op_consistency_level,
429 table_option,
430 distribution.vnodes().clone(),
431 )
432 };
433 let local_state_store = store.new_local(new_local_options).await;
434
435 assert_eq!(
441 table_catalog.version.is_some(),
442 row_serde.kind().is_column_aware()
443 );
444
445 let watermark_serde = if pk_indices.is_empty() {
447 None
448 } else {
449 match table_catalog.clean_watermark_index_in_pk {
450 None => Some(pk_serde.index(0)),
451 Some(clean_watermark_index_in_pk) => {
452 Some(pk_serde.index(clean_watermark_index_in_pk as usize))
453 }
454 }
455 };
456 let max_watermark_of_vnodes = distribution
457 .vnodes()
458 .iter_vnodes()
459 .filter_map(|vnode| local_state_store.get_table_watermark(vnode))
460 .max();
461 let committed_watermark = if let Some(deser) = watermark_serde
462 && let Some(max_watermark) = max_watermark_of_vnodes
463 {
464 let deserialized = deser.deserialize(&max_watermark).ok().and_then(|row| {
465 assert!(row.len() == 1);
466 row[0].clone()
467 });
468 if deserialized.is_none() {
469 tracing::error!(
470 vnodes = ?distribution.vnodes(),
471 watermark = ?max_watermark,
472 "Failed to deserialize persisted watermark from state store.",
473 );
474 }
475 deserialized
476 } else {
477 None
478 };
479
480 let watermark_cache = if USE_WATERMARK_CACHE {
481 StateTableWatermarkCache::new(WATERMARK_CACHE_ENTRIES)
482 } else {
483 StateTableWatermarkCache::new(0)
484 };
485
486 let output_column_ids_to_input_idx = output_column_ids
488 .iter()
489 .enumerate()
490 .map(|(pos, id)| (*id, pos))
491 .collect::<HashMap<_, _>>();
492
493 let columns: Vec<ColumnDesc> = table_catalog
495 .columns
496 .iter()
497 .map(|c| c.column_desc.as_ref().unwrap().into())
498 .collect_vec();
499
500 let mut i2o_mapping = vec![None; columns.len()];
504 for (i, column) in columns.iter().enumerate() {
505 if let Some(pos) = output_column_ids_to_input_idx.get(&column.column_id) {
506 i2o_mapping[i] = Some(*pos);
507 }
508 }
509 let i2o_mapping = ColIndexMapping::new(i2o_mapping, output_column_ids.len());
511
512 let (_, output_indices) = find_columns_by_ids(&columns[..], &output_column_ids);
514
515 Self {
516 table_id,
517 local_store: local_state_store,
518 store,
519 epoch: None,
520 pk_serde,
521 row_serde,
522 pk_indices,
523 distribution,
524 prefix_hint_len,
525 table_option,
526 value_indices,
527 pending_watermark: None,
528 committed_watermark,
529 watermark_cache,
530 data_types,
531 output_indices,
532 i2o_mapping,
533 op_consistency_level: state_table_op_consistency_level,
534 clean_watermark_index_in_pk: table_catalog.clean_watermark_index_in_pk,
535 on_post_commit: false,
536 }
537 }
538
539 pub fn get_data_types(&self) -> &[DataType] {
540 &self.data_types
541 }
542
543 pub fn table_id(&self) -> u32 {
544 self.table_id.table_id
545 }
546
547 fn compute_prefix_vnode(&self, pk_prefix: impl Row) -> VirtualNode {
549 self.distribution
550 .try_compute_vnode_by_pk_prefix(pk_prefix)
551 .expect("For streaming, the given prefix must be enough to calculate the vnode")
552 }
553
554 pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode {
556 self.distribution.compute_vnode_by_pk(pk)
557 }
558
559 pub fn pk_indices(&self) -> &[usize] {
562 &self.pk_indices
563 }
564
565 pub fn pk_in_output_indices(&self) -> Option<Vec<usize>> {
569 assert!(IS_REPLICATED);
570 self.pk_indices
571 .iter()
572 .map(|&i| self.output_indices.iter().position(|&j| i == j))
573 .collect()
574 }
575
576 pub fn pk_serde(&self) -> &OrderedRowSerde {
577 &self.pk_serde
578 }
579
580 pub fn vnodes(&self) -> &Arc<Bitmap> {
581 self.distribution.vnodes()
582 }
583
584 pub fn value_indices(&self) -> &Option<Vec<usize>> {
585 &self.value_indices
586 }
587
588 pub fn is_consistent_op(&self) -> bool {
589 matches!(
590 self.op_consistency_level,
591 StateTableOpConsistencyLevel::ConsistentOldValue
592 | StateTableOpConsistencyLevel::LogStoreEnabled
593 )
594 }
595}
596
597impl<S, SD, const USE_WATERMARK_CACHE: bool> StateTableInner<S, SD, true, USE_WATERMARK_CACHE>
598where
599 S: StateStore,
600 SD: ValueRowSerde,
601{
602 pub async fn from_table_catalog_with_output_column_ids(
604 table_catalog: &Table,
605 store: S,
606 vnodes: Option<Arc<Bitmap>>,
607 output_column_ids: Vec<ColumnId>,
608 ) -> Self {
609 Self::from_table_catalog_inner(
610 table_catalog,
611 store,
612 vnodes,
613 StateTableOpConsistencyLevel::Inconsistent,
614 output_column_ids,
615 )
616 .await
617 }
618}
619
620impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
622 StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
623where
624 S: StateStore,
625 SD: ValueRowSerde,
626{
627 pub async fn get_row(&self, pk: impl Row) -> StreamExecutorResult<Option<OwnedRow>> {
629 let row_serde = self.row_serde.clone();
631 let row = self
632 .get_inner(pk, move |_, value| Ok(row_serde.deserialize(value)?))
633 .await?;
634 match row {
635 Some(row) => {
636 if IS_REPLICATED {
637 let row = row.project(&self.output_indices);
640 Ok(Some(row.into_owned_row()))
641 } else {
642 Ok(Some(OwnedRow::new(row)))
643 }
644 }
645 None => Ok(None),
646 }
647 }
648
649 pub async fn get_encoded_row(&self, pk: impl Row) -> StreamExecutorResult<Option<Bytes>> {
651 self.get_inner(pk, |_, value| Ok(Bytes::copy_from_slice(value)))
652 .await
653 }
654
655 async fn get_inner<O: Send + 'static>(
656 &self,
657 pk: impl Row,
658 on_key_value_fn: impl risingwave_storage::store::KeyValueFn<O>,
659 ) -> StreamExecutorResult<Option<O>> {
660 assert!(pk.len() <= self.pk_indices.len());
661
662 let serialized_pk =
663 serialize_pk_with_vnode(&pk, &self.pk_serde, self.compute_vnode_by_pk(&pk));
664
665 let prefix_hint = if self.prefix_hint_len != 0 && self.prefix_hint_len == pk.len() {
666 Some(serialized_pk.slice(VirtualNode::SIZE..))
667 } else {
668 #[cfg(debug_assertions)]
669 if self.prefix_hint_len != 0 {
670 warn!(
671 "prefix_hint_len is not equal to pk.len(), may not be able to utilize bloom filter"
672 );
673 }
674 None
675 };
676
677 let read_options = ReadOptions {
678 prefix_hint,
679 retention_seconds: self.table_option.retention_seconds,
680 cache_policy: CachePolicy::Fill(Hint::Normal),
681 ..Default::default()
682 };
683
684 self.local_store
685 .on_key_value(serialized_pk, read_options, on_key_value_fn)
686 .await
687 .map_err(Into::into)
688 }
689}
690
691#[must_use]
706pub struct StateTablePostCommit<
707 'a,
708 S,
709 SD = BasicSerde,
710 const IS_REPLICATED: bool = false,
711 const USE_WATERMARK_CACHE: bool = false,
712> where
713 S: StateStore,
714 SD: ValueRowSerde,
715{
716 inner: &'a mut StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>,
717}
718
719impl<'a, S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
720 StateTablePostCommit<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
721where
722 S: StateStore,
723 SD: ValueRowSerde,
724{
725 pub async fn post_yield_barrier(
726 mut self,
727 new_vnodes: Option<Arc<Bitmap>>,
728 ) -> StreamExecutorResult<
729 Option<(
730 (
731 Arc<Bitmap>,
732 Arc<Bitmap>,
733 &'a mut StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>,
734 ),
735 bool,
736 )>,
737 > {
738 self.inner.on_post_commit = false;
739 Ok(if let Some(new_vnodes) = new_vnodes {
740 let (old_vnodes, cache_may_stale) =
741 self.update_vnode_bitmap(new_vnodes.clone()).await?;
742 Some(((new_vnodes, old_vnodes, self.inner), cache_may_stale))
743 } else {
744 None
745 })
746 }
747
748 pub fn inner(&self) -> &StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE> {
749 &*self.inner
750 }
751
752 async fn update_vnode_bitmap(
754 &mut self,
755 new_vnodes: Arc<Bitmap>,
756 ) -> StreamExecutorResult<(Arc<Bitmap>, bool)> {
757 let prev_vnodes = self
758 .inner
759 .local_store
760 .update_vnode_bitmap(new_vnodes.clone())
761 .await?;
762 assert_eq!(
763 &prev_vnodes,
764 self.inner.vnodes(),
765 "state table and state store vnode bitmap mismatches"
766 );
767
768 if self.inner.distribution.is_singleton() {
769 assert_eq!(
770 &new_vnodes,
771 self.inner.vnodes(),
772 "should not update vnode bitmap for singleton table"
773 );
774 }
775 assert_eq!(self.inner.vnodes().len(), new_vnodes.len());
776
777 let cache_may_stale = cache_may_stale(self.inner.vnodes(), &new_vnodes);
778
779 if cache_may_stale {
780 self.inner.pending_watermark = None;
781 if USE_WATERMARK_CACHE {
782 self.inner.watermark_cache.clear();
783 }
784 }
785
786 Ok((
787 self.inner.distribution.update_vnode_bitmap(new_vnodes),
788 cache_may_stale,
789 ))
790 }
791}
792
793impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
795 StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
796where
797 S: StateStore,
798 SD: ValueRowSerde,
799{
800 fn handle_mem_table_error(&self, e: StorageError) {
801 let e = match e.into_inner() {
802 ErrorKind::MemTable(e) => e,
803 _ => unreachable!("should only get memtable error"),
804 };
805 match *e {
806 MemTableError::InconsistentOperation { key, prev, new } => {
807 let (vnode, key) = deserialize_pk_with_vnode(&key, &self.pk_serde).unwrap();
808 panic!(
809 "mem-table operation inconsistent! table_id: {}, vnode: {}, key: {:?}, prev: {}, new: {}",
810 self.table_id(),
811 vnode,
812 &key,
813 prev.debug_fmt(&*self.row_serde),
814 new.debug_fmt(&*self.row_serde),
815 )
816 }
817 }
818 }
819
820 fn serialize_value(&self, value: impl Row) -> Bytes {
821 if let Some(value_indices) = self.value_indices.as_ref() {
822 self.row_serde
823 .serialize(value.project(value_indices))
824 .into()
825 } else {
826 self.row_serde.serialize(value).into()
827 }
828 }
829
830 fn insert_inner(&mut self, key: TableKey<Bytes>, value_bytes: Bytes) {
831 insane_mode_discard_point!();
832 self.local_store
833 .insert(key, value_bytes, None)
834 .unwrap_or_else(|e| self.handle_mem_table_error(e));
835 }
836
837 fn delete_inner(&mut self, key: TableKey<Bytes>, value_bytes: Bytes) {
838 insane_mode_discard_point!();
839 self.local_store
840 .delete(key, value_bytes)
841 .unwrap_or_else(|e| self.handle_mem_table_error(e));
842 }
843
844 fn update_inner(
845 &mut self,
846 key_bytes: TableKey<Bytes>,
847 old_value_bytes: Option<Bytes>,
848 new_value_bytes: Bytes,
849 ) {
850 insane_mode_discard_point!();
851 self.local_store
852 .insert(key_bytes, new_value_bytes, old_value_bytes)
853 .unwrap_or_else(|e| self.handle_mem_table_error(e));
854 }
855
856 pub fn insert(&mut self, value: impl Row) {
859 let pk_indices = &self.pk_indices;
860 let pk = (&value).project(pk_indices);
861 if USE_WATERMARK_CACHE {
862 self.watermark_cache.insert(&pk);
863 }
864
865 let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_vnode_by_pk(pk));
866 let value_bytes = self.serialize_value(value);
867 self.insert_inner(key_bytes, value_bytes);
868 }
869
870 pub fn delete(&mut self, old_value: impl Row) {
873 let pk_indices = &self.pk_indices;
874 let pk = (&old_value).project(pk_indices);
875 if USE_WATERMARK_CACHE {
876 self.watermark_cache.delete(&pk);
877 }
878
879 let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_vnode_by_pk(pk));
880 let value_bytes = self.serialize_value(old_value);
881 self.delete_inner(key_bytes, value_bytes);
882 }
883
884 pub fn update(&mut self, old_value: impl Row, new_value: impl Row) {
886 let old_pk = (&old_value).project(self.pk_indices());
887 let new_pk = (&new_value).project(self.pk_indices());
888 debug_assert!(
889 Row::eq(&old_pk, new_pk),
890 "pk should not change: {old_pk:?} vs {new_pk:?}",
891 );
892
893 let new_key_bytes =
894 serialize_pk_with_vnode(new_pk, &self.pk_serde, self.compute_vnode_by_pk(new_pk));
895 let old_value_bytes = self.serialize_value(old_value);
896 let new_value_bytes = self.serialize_value(new_value);
897
898 self.update_inner(new_key_bytes, Some(old_value_bytes), new_value_bytes);
899 }
900
901 pub fn write_record(&mut self, record: Record<impl Row>) {
903 match record {
904 Record::Insert { new_row } => self.insert(new_row),
905 Record::Delete { old_row } => self.delete(old_row),
906 Record::Update { old_row, new_row } => self.update(old_row, new_row),
907 }
908 }
909
910 fn fill_non_output_indices(&self, chunk: StreamChunk) -> StreamChunk {
911 fill_non_output_indices(&self.i2o_mapping, &self.data_types, chunk)
912 }
913
914 #[allow(clippy::disallowed_methods)]
917 pub fn write_chunk(&mut self, chunk: StreamChunk) {
918 let chunk = if IS_REPLICATED {
919 self.fill_non_output_indices(chunk)
920 } else {
921 chunk
922 };
923 let (chunk, op) = chunk.into_parts();
924
925 let vnodes = self
926 .distribution
927 .compute_chunk_vnode(&chunk, &self.pk_indices);
928
929 let values = if let Some(ref value_indices) = self.value_indices {
930 chunk
931 .project(value_indices)
932 .serialize_with(&*self.row_serde)
933 } else {
934 chunk.serialize_with(&*self.row_serde)
935 };
936
937 let key_chunk = chunk.project(self.pk_indices());
942 let vnode_and_pks = key_chunk
943 .rows_with_holes()
944 .zip_eq_fast(vnodes.iter())
945 .map(|(r, vnode)| {
946 let mut buffer = BytesMut::new();
947 buffer.put_slice(&vnode.to_be_bytes()[..]);
948 if let Some(r) = r {
949 self.pk_serde.serialize(r, &mut buffer);
950 }
951 (r, buffer.freeze())
952 })
953 .collect_vec();
954
955 if !key_chunk.is_compacted() {
956 for ((op, (key, key_bytes), value), vis) in
957 izip!(op.iter(), vnode_and_pks, values).zip_eq_debug(key_chunk.visibility().iter())
958 {
959 if vis {
960 match op {
961 Op::Insert | Op::UpdateInsert => {
962 if USE_WATERMARK_CACHE && let Some(ref pk) = key {
963 self.watermark_cache.insert(pk);
964 }
965 self.insert_inner(TableKey(key_bytes), value);
966 }
967 Op::Delete | Op::UpdateDelete => {
968 if USE_WATERMARK_CACHE && let Some(ref pk) = key {
969 self.watermark_cache.delete(pk);
970 }
971 self.delete_inner(TableKey(key_bytes), value);
972 }
973 }
974 }
975 }
976 } else {
977 for (op, (key, key_bytes), value) in izip!(op.iter(), vnode_and_pks, values) {
978 match op {
979 Op::Insert | Op::UpdateInsert => {
980 if USE_WATERMARK_CACHE && let Some(ref pk) = key {
981 self.watermark_cache.insert(pk);
982 }
983 self.insert_inner(TableKey(key_bytes), value);
984 }
985 Op::Delete | Op::UpdateDelete => {
986 if USE_WATERMARK_CACHE && let Some(ref pk) = key {
987 self.watermark_cache.delete(pk);
988 }
989 self.delete_inner(TableKey(key_bytes), value);
990 }
991 }
992 }
993 }
994 }
995
996 pub fn update_watermark(&mut self, watermark: ScalarImpl) {
1002 trace!(table_id = %self.table_id, watermark = ?watermark, "update watermark");
1003 self.pending_watermark = Some(watermark);
1004 }
1005
1006 pub fn get_committed_watermark(&self) -> Option<&ScalarImpl> {
1009 self.committed_watermark.as_ref()
1010 }
1011
1012 pub async fn commit(
1013 &mut self,
1014 new_epoch: EpochPair,
1015 ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1016 {
1017 self.commit_inner(new_epoch, None).await
1018 }
1019
1020 #[cfg(test)]
1021 pub async fn commit_for_test(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
1022 self.commit_assert_no_update_vnode_bitmap(new_epoch).await
1023 }
1024
1025 pub async fn commit_assert_no_update_vnode_bitmap(
1026 &mut self,
1027 new_epoch: EpochPair,
1028 ) -> StreamExecutorResult<()> {
1029 let post_commit = self.commit_inner(new_epoch, None).await?;
1030 post_commit.post_yield_barrier(None).await?;
1031 Ok(())
1032 }
1033
1034 pub async fn commit_may_switch_consistent_op(
1035 &mut self,
1036 new_epoch: EpochPair,
1037 op_consistency_level: StateTableOpConsistencyLevel,
1038 ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1039 {
1040 if self.op_consistency_level != op_consistency_level {
1041 info!(
1042 ?new_epoch,
1043 prev_op_consistency_level = ?self.op_consistency_level,
1044 ?op_consistency_level,
1045 table_id = self.table_id.table_id,
1046 "switch to new op consistency level"
1047 );
1048 self.commit_inner(new_epoch, Some(op_consistency_level))
1049 .await
1050 } else {
1051 self.commit_inner(new_epoch, None).await
1052 }
1053 }
1054
1055 async fn commit_inner(
1056 &mut self,
1057 new_epoch: EpochPair,
1058 switch_consistent_op: Option<StateTableOpConsistencyLevel>,
1059 ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1060 {
1061 assert!(!self.on_post_commit);
1062 assert_eq!(
1063 self.epoch.expect("should only be called after init").curr,
1064 new_epoch.prev
1065 );
1066 let switch_op_consistency_level = switch_consistent_op.map(|new_consistency_level| {
1067 assert_ne!(self.op_consistency_level, new_consistency_level);
1068 self.op_consistency_level = new_consistency_level;
1069 match new_consistency_level {
1070 StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
1071 StateTableOpConsistencyLevel::ConsistentOldValue => {
1072 consistent_old_value_op(self.row_serde.clone(), false)
1073 }
1074 StateTableOpConsistencyLevel::LogStoreEnabled => {
1075 consistent_old_value_op(self.row_serde.clone(), true)
1076 }
1077 }
1078 });
1079 trace!(
1080 table_id = %self.table_id,
1081 epoch = ?self.epoch,
1082 "commit state table"
1083 );
1084
1085 self.local_store
1086 .flush()
1087 .instrument(tracing::info_span!("state_table_flush"))
1088 .await?;
1089 let table_watermarks = self.commit_pending_watermark();
1090 self.local_store.seal_current_epoch(
1091 new_epoch.curr,
1092 SealCurrentEpochOptions {
1093 table_watermarks,
1094 switch_op_consistency_level,
1095 },
1096 );
1097 self.epoch = Some(new_epoch);
1098
1099 if USE_WATERMARK_CACHE && !self.watermark_cache.is_synced() {
1101 if let Some(ref watermark) = self.committed_watermark {
1102 let range: (Bound<Once<Datum>>, Bound<Once<Datum>>) =
1103 (Included(once(Some(watermark.clone()))), Unbounded);
1104 let mut pks = Vec::with_capacity(self.watermark_cache.capacity());
1113 {
1114 let mut streams = vec![];
1115 for vnode in self.vnodes().iter_vnodes() {
1116 let stream = self
1117 .iter_keyed_row_with_vnode(vnode, &range, PrefetchOptions::default())
1118 .await?;
1119 streams.push(Box::pin(stream));
1120 }
1121 let merged_stream = merge_sort(streams);
1122 pin_mut!(merged_stream);
1123
1124 #[for_await]
1125 for entry in merged_stream.take(self.watermark_cache.capacity()) {
1126 let keyed_row = entry?;
1127 let pk = self.pk_serde.deserialize(keyed_row.key())?;
1128 if !pk.is_null_at(self.clean_watermark_index_in_pk.unwrap_or(0) as usize) {
1130 pks.push(pk);
1131 }
1132 }
1133 }
1134
1135 let mut filler = self.watermark_cache.begin_syncing();
1136 for pk in pks {
1137 filler.insert_unchecked(DefaultOrdered(pk), ());
1138 }
1139 filler.finish();
1140
1141 let n_cache_entries = self.watermark_cache.len();
1142 if n_cache_entries < self.watermark_cache.capacity() {
1143 self.watermark_cache.set_table_row_count(n_cache_entries);
1144 }
1145 }
1146 }
1147
1148 self.on_post_commit = true;
1149 Ok(StateTablePostCommit { inner: self })
1150 }
1151
1152 fn commit_pending_watermark(
1154 &mut self,
1155 ) -> Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)> {
1156 let watermark = self.pending_watermark.take()?;
1157 trace!(table_id = %self.table_id, watermark = ?watermark, "state cleaning");
1158
1159 assert!(
1160 !self.pk_indices().is_empty(),
1161 "see pending watermark on empty pk"
1162 );
1163 let watermark_serializer = {
1164 match self.clean_watermark_index_in_pk {
1165 None => self.pk_serde.index(0),
1166 Some(clean_watermark_index_in_pk) => {
1167 self.pk_serde.index(clean_watermark_index_in_pk as usize)
1168 }
1169 }
1170 };
1171
1172 let watermark_type = match self.clean_watermark_index_in_pk {
1173 None => WatermarkSerdeType::PkPrefix,
1174 Some(clean_watermark_index_in_pk) => match clean_watermark_index_in_pk {
1175 0 => WatermarkSerdeType::PkPrefix,
1176 _ => WatermarkSerdeType::NonPkPrefix,
1177 },
1178 };
1179
1180 let should_clean_watermark = {
1181 {
1182 if USE_WATERMARK_CACHE && self.watermark_cache.is_synced() {
1183 if let Some(key) = self.watermark_cache.lowest_key() {
1184 watermark.as_scalar_ref_impl().default_cmp(&key).is_ge()
1185 } else {
1186 false
1191 }
1192 } else {
1193 true
1197 }
1198 }
1199 };
1200
1201 let watermark_suffix =
1202 serialize_pk(row::once(Some(watermark.clone())), &watermark_serializer);
1203
1204 let seal_watermark = if should_clean_watermark {
1206 trace!(table_id = %self.table_id, watermark = ?watermark_suffix, vnodes = ?{
1207 self.vnodes().iter_vnodes().collect_vec()
1208 }, "delete range");
1209
1210 let order_type = watermark_serializer.get_order_types().get(0).unwrap();
1211
1212 if order_type.is_ascending() {
1213 Some((
1214 WatermarkDirection::Ascending,
1215 VnodeWatermark::new(
1216 self.vnodes().clone(),
1217 Bytes::copy_from_slice(watermark_suffix.as_ref()),
1218 ),
1219 watermark_type,
1220 ))
1221 } else {
1222 Some((
1223 WatermarkDirection::Descending,
1224 VnodeWatermark::new(
1225 self.vnodes().clone(),
1226 Bytes::copy_from_slice(watermark_suffix.as_ref()),
1227 ),
1228 watermark_type,
1229 ))
1230 }
1231 } else {
1232 None
1233 };
1234 self.committed_watermark = Some(watermark);
1235
1236 if USE_WATERMARK_CACHE && seal_watermark.is_some() {
1244 self.watermark_cache.clear();
1245 }
1246
1247 seal_watermark.map(|(direction, watermark, is_non_pk_prefix)| {
1248 (direction, vec![watermark], is_non_pk_prefix)
1249 })
1250 }
1251
1252 pub async fn try_flush(&mut self) -> StreamExecutorResult<()> {
1253 self.local_store.try_flush().await?;
1254 Ok(())
1255 }
1256}
1257
1258pub trait RowStream<'a> = Stream<Item = StreamExecutorResult<OwnedRow>> + 'a;
1259pub trait KeyedRowStream<'a> = Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a;
1260pub trait PkRowStream<'a, K> = Stream<Item = StreamExecutorResult<(K, OwnedRow)>> + 'a;
1261
1262impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1264 StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1265where
1266 S: StateStore,
1267 SD: ValueRowSerde,
1268{
1269 pub async fn iter_with_vnode(
1272 &self,
1273
1274 vnode: VirtualNode,
1278 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1279 prefetch_options: PrefetchOptions,
1280 ) -> StreamExecutorResult<impl RowStream<'_>> {
1281 Ok(deserialize_keyed_row_stream::<'_, ()>(
1282 self.iter_kv_with_pk_range(pk_range, vnode, prefetch_options)
1283 .await?,
1284 &*self.row_serde,
1285 )
1286 .map_ok(|(_, row)| row))
1287 }
1288
1289 pub async fn iter_keyed_row_with_vnode(
1290 &self,
1291 vnode: VirtualNode,
1292 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1293 prefetch_options: PrefetchOptions,
1294 ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1295 Ok(deserialize_keyed_row_stream(
1296 self.iter_kv_with_pk_range(pk_range, vnode, prefetch_options)
1297 .await?,
1298 &*self.row_serde,
1299 )
1300 .map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)))
1301 }
1302
1303 pub async fn iter_with_vnode_and_output_indices(
1304 &self,
1305 vnode: VirtualNode,
1306 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1307 prefetch_options: PrefetchOptions,
1308 ) -> StreamExecutorResult<impl RowStream<'_>> {
1309 assert!(IS_REPLICATED);
1310 let stream = self
1311 .iter_with_vnode(vnode, pk_range, prefetch_options)
1312 .await?;
1313 Ok(stream.map(|row| row.map(|row| row.project(&self.output_indices).into_owned_row())))
1314 }
1315
1316 async fn iter_kv(
1317 &self,
1318 table_key_range: TableKeyRange,
1319 prefix_hint: Option<Bytes>,
1320 prefetch_options: PrefetchOptions,
1321 ) -> StreamExecutorResult<<S::Local as LocalStateStore>::Iter<'_>> {
1322 let read_options = ReadOptions {
1323 prefix_hint,
1324 retention_seconds: self.table_option.retention_seconds,
1325 prefetch_options,
1326 cache_policy: CachePolicy::Fill(Hint::Normal),
1327 };
1328
1329 Ok(self.local_store.iter(table_key_range, read_options).await?)
1330 }
1331
1332 async fn rev_iter_kv(
1333 &self,
1334 table_key_range: TableKeyRange,
1335 prefix_hint: Option<Bytes>,
1336 prefetch_options: PrefetchOptions,
1337 ) -> StreamExecutorResult<<S::Local as LocalStateStore>::RevIter<'_>> {
1338 let read_options = ReadOptions {
1339 prefix_hint,
1340 retention_seconds: self.table_option.retention_seconds,
1341 prefetch_options,
1342 cache_policy: CachePolicy::Fill(Hint::Normal),
1343 };
1344
1345 Ok(self
1346 .local_store
1347 .rev_iter(table_key_range, read_options)
1348 .await?)
1349 }
1350
1351 pub async fn iter_with_prefix(
1355 &self,
1356 pk_prefix: impl Row,
1357 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1358 prefetch_options: PrefetchOptions,
1359 ) -> StreamExecutorResult<impl RowStream<'_>> {
1360 let stream = self.iter_with_prefix_inner::<false, ()>(pk_prefix, sub_range, prefetch_options)
1361 .await?;
1362 Ok(stream.map_ok(|(_, row)| row))
1363 }
1364
1365 pub async fn get_from_one_row_table(&self) -> StreamExecutorResult<Option<OwnedRow>> {
1367 let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
1368 let stream = self
1369 .iter_with_prefix(row::empty(), sub_range, Default::default())
1370 .await?;
1371 pin_mut!(stream);
1372
1373 if let Some(res) = stream.next().await {
1374 let value = res?.into_owned_row();
1375 assert!(stream.next().await.is_none());
1376 Ok(Some(value))
1377 } else {
1378 Ok(None)
1379 }
1380 }
1381
1382 pub async fn get_from_one_value_table(&self) -> StreamExecutorResult<Option<ScalarImpl>> {
1387 Ok(self
1388 .get_from_one_row_table()
1389 .await?
1390 .and_then(|row| row[0].clone()))
1391 }
1392
1393 pub async fn iter_keyed_row_with_prefix(
1394 &self,
1395 pk_prefix: impl Row,
1396 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1397 prefetch_options: PrefetchOptions,
1398 ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1399 Ok(
1400 self.iter_with_prefix_inner::<false, Bytes>(pk_prefix, sub_range, prefetch_options)
1401 .await?.map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)),
1402 )
1403 }
1404
1405 pub async fn rev_iter_with_prefix(
1407 &self,
1408 pk_prefix: impl Row,
1409 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1410 prefetch_options: PrefetchOptions,
1411 ) -> StreamExecutorResult<impl RowStream<'_>> {
1412 Ok(
1413 self.iter_with_prefix_inner::<true, ()>(pk_prefix, sub_range, prefetch_options)
1414 .await?.map_ok(|(_, row)| row),
1415 )
1416 }
1417
1418 async fn iter_with_prefix_inner<const REVERSE: bool, K: CopyFromSlice>(
1419 &self,
1420 pk_prefix: impl Row,
1421 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1422 prefetch_options: PrefetchOptions,
1423 ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1424 let prefix_serializer = self.pk_serde.prefix(pk_prefix.len());
1425 let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
1426
1427 let vnode = self.compute_prefix_vnode(&pk_prefix);
1431
1432 let pk_prefix_indices = &self.pk_indices[..pk_prefix.len()];
1434 if self.prefix_hint_len != 0 {
1435 debug_assert_eq!(self.prefix_hint_len, pk_prefix.len());
1436 }
1437 let prefix_hint = {
1438 if self.prefix_hint_len == 0 || self.prefix_hint_len > pk_prefix.len() {
1439 None
1440 } else {
1441 let encoded_prefix_len = self
1442 .pk_serde
1443 .deserialize_prefix_len(&encoded_prefix, self.prefix_hint_len)?;
1444
1445 Some(Bytes::copy_from_slice(
1446 &encoded_prefix[..encoded_prefix_len],
1447 ))
1448 }
1449 };
1450
1451 trace!(
1452 table_id = %self.table_id(),
1453 ?prefix_hint, ?pk_prefix,
1454 ?pk_prefix_indices,
1455 iter_direction = if REVERSE { "reverse" } else { "forward" },
1456 "storage_iter_with_prefix"
1457 );
1458
1459 let memcomparable_range =
1460 prefix_and_sub_range_to_memcomparable(&self.pk_serde, sub_range, pk_prefix);
1461
1462 let memcomparable_range_with_vnode = prefixed_range_with_vnode(memcomparable_range, vnode);
1463
1464 Ok(if REVERSE {
1465 futures::future::Either::Left(deserialize_keyed_row_stream(
1466 self.rev_iter_kv(
1467 memcomparable_range_with_vnode,
1468 prefix_hint,
1469 prefetch_options,
1470 )
1471 .await?,
1472 &*self.row_serde,
1473 ))
1474 } else {
1475 futures::future::Either::Right(deserialize_keyed_row_stream(
1476 self.iter_kv(
1477 memcomparable_range_with_vnode,
1478 prefix_hint,
1479 prefetch_options,
1480 )
1481 .await?,
1482 &*self.row_serde,
1483 ))
1484 })
1485 }
1486
1487 async fn iter_kv_with_pk_range(
1490 &self,
1491 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1492 vnode: VirtualNode,
1496 prefetch_options: PrefetchOptions,
1497 ) -> StreamExecutorResult<<S::Local as LocalStateStore>::Iter<'_>> {
1498 let memcomparable_range = prefix_range_to_memcomparable(&self.pk_serde, pk_range);
1499 let memcomparable_range_with_vnode = prefixed_range_with_vnode(memcomparable_range, vnode);
1500
1501 self.iter_kv(memcomparable_range_with_vnode, None, prefetch_options)
1503 .await
1504 }
1505
1506 #[cfg(test)]
1507 pub fn get_watermark_cache(&self) -> &StateTableWatermarkCache {
1508 &self.watermark_cache
1509 }
1510}
1511
1512impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1513 StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1514where
1515 S: StateStore,
1516 SD: ValueRowSerde,
1517{
1518 pub async fn iter_log_with_vnode(
1519 &self,
1520 vnode: VirtualNode,
1521 epoch_range: (u64, u64),
1522 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1523 ) -> StreamExecutorResult<impl Stream<Item = StreamExecutorResult<ChangeLogRow>> + '_> {
1524 let memcomparable_range = prefix_range_to_memcomparable(&self.pk_serde, pk_range);
1525 let memcomparable_range_with_vnode = prefixed_range_with_vnode(memcomparable_range, vnode);
1526 Ok(deserialize_log_stream(
1527 self.store
1528 .iter_log(
1529 epoch_range,
1530 memcomparable_range_with_vnode,
1531 ReadLogOptions {
1532 table_id: self.table_id,
1533 },
1534 )
1535 .await?,
1536 &*self.row_serde,
1537 )
1538 .map_err(Into::into))
1539 }
1540}
1541
1542fn deserialize_keyed_row_stream<'a, K: CopyFromSlice>(
1543 iter: impl StateStoreIter + 'a,
1544 deserializer: &'a impl ValueRowSerde,
1545) -> impl PkRowStream<'a, K> {
1546 iter.into_stream(move |(key, value)| {
1547 Ok((
1548 K::copy_from_slice(key.user_key.table_key.as_ref()),
1549 deserializer.deserialize(value).map(OwnedRow::new)?,
1550 ))
1551 })
1552 .map_err(Into::into)
1553}
1554
1555pub fn prefix_range_to_memcomparable(
1556 pk_serde: &OrderedRowSerde,
1557 range: &(Bound<impl Row>, Bound<impl Row>),
1558) -> (Bound<Bytes>, Bound<Bytes>) {
1559 (
1560 start_range_to_memcomparable(pk_serde, &range.0),
1561 end_range_to_memcomparable(pk_serde, &range.1, None),
1562 )
1563}
1564
1565fn prefix_and_sub_range_to_memcomparable(
1566 pk_serde: &OrderedRowSerde,
1567 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1568 pk_prefix: impl Row,
1569) -> (Bound<Bytes>, Bound<Bytes>) {
1570 let (range_start, range_end) = sub_range;
1571 let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1572 let serialized_pk_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
1573 let start_range = match range_start {
1574 Included(start_range) => Bound::Included(Either::Left((&pk_prefix).chain(start_range))),
1575 Excluded(start_range) => Bound::Excluded(Either::Left((&pk_prefix).chain(start_range))),
1576 Unbounded => Bound::Included(Either::Right(&pk_prefix)),
1577 };
1578 let end_range = match range_end {
1579 Included(end_range) => Bound::Included((&pk_prefix).chain(end_range)),
1580 Excluded(end_range) => Bound::Excluded((&pk_prefix).chain(end_range)),
1581 Unbounded => Unbounded,
1582 };
1583 (
1584 start_range_to_memcomparable(pk_serde, &start_range),
1585 end_range_to_memcomparable(pk_serde, &end_range, Some(serialized_pk_prefix)),
1586 )
1587}
1588
1589fn start_range_to_memcomparable<R: Row>(
1590 pk_serde: &OrderedRowSerde,
1591 bound: &Bound<R>,
1592) -> Bound<Bytes> {
1593 let serialize_pk_prefix = |pk_prefix: &R| {
1594 let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1595 serialize_pk(pk_prefix, &prefix_serializer)
1596 };
1597 match bound {
1598 Unbounded => Unbounded,
1599 Included(r) => {
1600 let serialized = serialize_pk_prefix(r);
1601
1602 Included(serialized)
1603 }
1604 Excluded(r) => {
1605 let serialized = serialize_pk_prefix(r);
1606
1607 start_bound_of_excluded_prefix(&serialized)
1608 }
1609 }
1610}
1611
1612fn end_range_to_memcomparable<R: Row>(
1613 pk_serde: &OrderedRowSerde,
1614 bound: &Bound<R>,
1615 serialized_pk_prefix: Option<Bytes>,
1616) -> Bound<Bytes> {
1617 let serialize_pk_prefix = |pk_prefix: &R| {
1618 let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1619 serialize_pk(pk_prefix, &prefix_serializer)
1620 };
1621 match bound {
1622 Unbounded => match serialized_pk_prefix {
1623 Some(serialized_pk_prefix) => end_bound_of_prefix(&serialized_pk_prefix),
1624 None => Unbounded,
1625 },
1626 Included(r) => {
1627 let serialized = serialize_pk_prefix(r);
1628
1629 end_bound_of_prefix(&serialized)
1630 }
1631 Excluded(r) => {
1632 let serialized = serialize_pk_prefix(r);
1633 Excluded(serialized)
1634 }
1635 }
1636}
1637
1638fn fill_non_output_indices(
1639 i2o_mapping: &ColIndexMapping,
1640 data_types: &[DataType],
1641 chunk: StreamChunk,
1642) -> StreamChunk {
1643 let cardinality = chunk.cardinality();
1644 let (ops, columns, vis) = chunk.into_inner();
1645 let mut full_columns = Vec::with_capacity(data_types.len());
1646 for (i, data_type) in data_types.iter().enumerate() {
1647 if let Some(j) = i2o_mapping.try_map(i) {
1648 full_columns.push(columns[j].clone());
1649 } else {
1650 let mut column_builder = ArrayImplBuilder::with_type(cardinality, data_type.clone());
1651 column_builder.append_n_null(cardinality);
1652 let column: ArrayRef = column_builder.finish().into();
1653 full_columns.push(column)
1654 }
1655 }
1656 let data_chunk = DataChunk::new(full_columns, vis);
1657 StreamChunk::from_parts(ops, data_chunk)
1658}
1659
1660#[cfg(test)]
1661mod tests {
1662 use std::fmt::Debug;
1663
1664 use expect_test::{Expect, expect};
1665
1666 use super::*;
1667
1668 fn check(actual: impl Debug, expect: Expect) {
1669 let actual = format!("{:#?}", actual);
1670 expect.assert_eq(&actual);
1671 }
1672
1673 #[test]
1674 fn test_fill_non_output_indices() {
1675 let data_types = vec![DataType::Int32, DataType::Int32, DataType::Int32];
1676 let replicated_chunk = [OwnedRow::new(vec![
1677 Some(222_i32.into()),
1678 Some(2_i32.into()),
1679 ])];
1680 let replicated_chunk = StreamChunk::from_parts(
1681 vec![Op::Insert],
1682 DataChunk::from_rows(&replicated_chunk, &[DataType::Int32, DataType::Int32]),
1683 );
1684 let i2o_mapping = ColIndexMapping::new(vec![Some(1), None, Some(0)], 2);
1685 let filled_chunk = fill_non_output_indices(&i2o_mapping, &data_types, replicated_chunk);
1686 check(
1687 filled_chunk,
1688 expect![[r#"
1689 StreamChunk { cardinality: 1, capacity: 1, data:
1690 +---+---+---+-----+
1691 | + | 2 | | 222 |
1692 +---+---+---+-----+
1693 }"#]],
1694 );
1695 }
1696}