1use std::collections::{BTreeMap, HashMap};
16use std::marker::PhantomData;
17use std::ops::Bound;
18use std::ops::Bound::*;
19use std::sync::Arc;
20use std::time::Instant;
21
22use anyhow::anyhow;
23use bytes::Bytes;
24use either::Either;
25use foyer::Hint;
26use futures::future::{ready, try_join_all};
27use futures::stream::BoxStream;
28use futures::{Stream, StreamExt, TryStreamExt, pin_mut};
29use itertools::Itertools;
30use risingwave_common::array::stream_record::Record;
31use risingwave_common::array::{ArrayImplBuilder, ArrayRef, DataChunk, Op, StreamChunk};
32use risingwave_common::bitmap::Bitmap;
33use risingwave_common::catalog::{
34 ColumnDesc, ColumnId, TableId, TableOption, get_dist_key_in_pk_indices,
35};
36use risingwave_common::config::StreamingConfig;
37use risingwave_common::hash::{VirtualNode, VnodeBitmapExt, VnodeCountCompat};
38use risingwave_common::row::{self, OwnedRow, Row, RowExt};
39use risingwave_common::types::{DataType, ScalarImpl};
40use risingwave_common::util::column_index_mapping::ColIndexMapping;
41use risingwave_common::util::epoch::EpochPair;
42use risingwave_common::util::row_serde::OrderedRowSerde;
43use risingwave_common::util::sort_util::OrderType;
44use risingwave_common::util::value_encoding::BasicSerde;
45use risingwave_hummock_sdk::HummockReadEpoch;
46use risingwave_hummock_sdk::key::{
47 CopyFromSlice, TableKey, end_bound_of_prefix, next_key, prefix_slice_with_vnode,
48 prefixed_range_with_vnode, start_bound_of_excluded_prefix,
49};
50use risingwave_hummock_sdk::table_watermark::{
51 VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
52};
53use risingwave_pb::catalog::Table;
54use risingwave_storage::StateStore;
55use risingwave_storage::error::{ErrorKind, StorageError, StorageResult};
56use risingwave_storage::hummock::CachePolicy;
57use risingwave_storage::mem_table::MemTableError;
58use risingwave_storage::row_serde::find_columns_by_ids;
59use risingwave_storage::row_serde::row_serde_util::{
60 deserialize_pk_with_vnode, serialize_pk, serialize_pk_with_vnode, serialize_row,
61};
62use risingwave_storage::row_serde::value_serde::ValueRowSerde;
63use risingwave_storage::store::*;
64use risingwave_storage::table::{KeyedRow, TableDistribution};
65use thiserror_ext::AsReport;
66use tracing::{Instrument, trace};
67
68use crate::cache::keyed_cache_may_stale;
69use crate::executor::monitor::streaming_stats::StateTableMetrics;
70use crate::executor::{StreamExecutorError, StreamExecutorResult};
71
72macro_rules! insane_mode_discard_point {
75 () => {{
76 use rand::Rng;
77 if crate::consistency::insane() && rand::rng().random_bool(0.3) {
78 return;
79 }
80 }};
81}
82
83struct VnodeStatistics {
87 min_key: Option<Bytes>,
88 max_key: Option<Bytes>,
89}
90
91impl VnodeStatistics {
92 fn new() -> Self {
93 Self {
94 min_key: None,
95 max_key: None,
96 }
97 }
98
99 fn update_with_key(&mut self, key: &Bytes) {
100 if let Some(min) = &self.min_key {
101 if key < min {
102 self.min_key = Some(key.clone());
103 }
104 } else {
105 self.min_key = Some(key.clone());
106 }
107
108 if let Some(max) = &self.max_key {
109 if key > max {
110 self.max_key = Some(key.clone());
111 }
112 } else {
113 self.max_key = Some(key.clone());
114 }
115 }
116
117 fn can_prune(&self, key: &Bytes) -> bool {
118 if let Some(min) = &self.min_key
119 && key < min
120 {
121 return true;
122 }
123 if let Some(max) = &self.max_key
124 && key > max
125 {
126 return true;
127 }
128 false
129 }
130
131 fn can_prune_range(&self, start: &Bound<Bytes>, end: &Bound<Bytes>) -> bool {
132 if let Some(max) = &self.max_key {
134 match start {
135 Included(s) if s > max => return true,
136 Excluded(s) if s >= max => return true,
137 _ => {}
138 }
139 }
140 if let Some(min) = &self.min_key {
141 match end {
142 Included(e) if e < min => return true,
143 Excluded(e) if e <= min => return true,
144 _ => {}
145 }
146 }
147 false
148 }
149
150 fn pruned_key_range(
151 &self,
152 start: &Bound<Bytes>,
153 end: &Bound<Bytes>,
154 ) -> Option<(Bound<Bytes>, Bound<Bytes>)> {
155 if self.can_prune_range(start, end) {
156 return None;
157 }
158 let new_start = if let Some(min) = &self.min_key {
159 match start {
160 Included(s) if s <= min => Included(min.clone()),
161 Excluded(s) if s < min => Included(min.clone()),
162 _ => start.clone(),
163 }
164 } else {
165 start.clone()
166 };
167
168 let new_end = if let Some(max) = &self.max_key {
169 match end {
170 Included(e) if e >= max => Included(max.clone()),
171 Excluded(e) if e > max => Included(max.clone()),
172 _ => end.clone(),
173 }
174 } else {
175 end.clone()
176 };
177
178 Some((new_start, new_end))
179 }
180}
181
182pub struct StateTableInner<S, SD = BasicSerde, const IS_REPLICATED: bool = false>
185where
186 S: StateStore,
187 SD: ValueRowSerde,
188{
189 table_id: TableId,
191
192 row_store: StateTableRowStore<S::Local, SD>,
194
195 store: S,
197
198 epoch: Option<EpochPair>,
200
201 pk_serde: OrderedRowSerde,
203
204 pk_indices: Vec<usize>,
208
209 distribution: TableDistribution,
215
216 prefix_hint_len: usize,
217
218 value_indices: Option<Vec<usize>>,
219
220 pub clean_watermark_index: Option<usize>,
222 pending_watermark: Option<ScalarImpl>,
224 committed_watermark: Option<ScalarImpl>,
226 watermark_serde: Option<(OrderedRowSerde, WatermarkSerdeType)>,
228
229 data_types: Vec<DataType>,
232
233 i2o_mapping: ColIndexMapping,
239
240 pub output_indices: Vec<usize>,
245
246 op_consistency_level: StateTableOpConsistencyLevel,
247
248 on_post_commit: bool,
251}
252
253pub type StateTable<S> = StateTableInner<S, BasicSerde>;
255pub type ReplicatedStateTable<S, SD> = StateTableInner<S, SD, true>;
258
259impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
261where
262 S: StateStore,
263 SD: ValueRowSerde,
264{
265 pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
268 self.row_store
269 .init(epoch, self.distribution.vnodes())
270 .await?;
271 assert_eq!(None, self.epoch.replace(epoch), "should not init for twice");
272 Ok(())
273 }
274
275 pub async fn try_wait_committed_epoch(&self, prev_epoch: u64) -> StorageResult<()> {
276 self.store
277 .try_wait_epoch(
278 HummockReadEpoch::Committed(prev_epoch),
279 TryWaitEpochOptions {
280 table_id: self.table_id,
281 },
282 )
283 .await
284 }
285
286 pub fn state_store(&self) -> &S {
287 &self.store
288 }
289}
290
291fn consistent_old_value_op(
292 row_serde: Arc<impl ValueRowSerde>,
293 is_log_store: bool,
294) -> OpConsistencyLevel {
295 OpConsistencyLevel::ConsistentOldValue {
296 check_old_value: Arc::new(move |first: &Bytes, second: &Bytes| {
297 if first == second {
298 return true;
299 }
300 let first = match row_serde.deserialize(first) {
301 Ok(rows) => rows,
302 Err(e) => {
303 error!(error = %e.as_report(), value = ?first, "fail to deserialize serialized value");
304 return false;
305 }
306 };
307 let second = match row_serde.deserialize(second) {
308 Ok(rows) => rows,
309 Err(e) => {
310 error!(error = %e.as_report(), value = ?second, "fail to deserialize serialized value");
311 return false;
312 }
313 };
314 if first != second {
315 error!(first = ?first, second = ?second, "sanity check fail");
316 false
317 } else {
318 true
319 }
320 }),
321 is_log_store,
322 }
323}
324
325macro_rules! dispatch_value_indices {
326 ($value_indices:expr, [$($row_var_name:ident),+], $body:expr) => {
327 if let Some(value_indices) = $value_indices {
328 $(
329 let $row_var_name = $row_var_name.project(value_indices);
330 )+
331 $body
332 } else {
333 $body
334 }
335 };
336}
337
338struct StateTableRowStore<LS: LocalStateStore, SD: ValueRowSerde> {
343 state_store: LS,
344 all_rows: Option<HashMap<VirtualNode, BTreeMap<Bytes, OwnedRow>>>,
345
346 table_id: TableId,
347 row_serde: Arc<SD>,
348 pk_serde: OrderedRowSerde,
350
351 vnode_stats: Option<HashMap<VirtualNode, VnodeStatistics>>,
353 enable_state_table_vnode_stats_pruning: bool,
357 pub metrics: Option<StateTableMetrics>,
359}
360
361impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
362 async fn may_load_vnode_stats(&mut self, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
363 if self.vnode_stats.is_none() {
364 return Ok(());
365 }
366
367 assert!(self.all_rows.is_none());
369
370 let start_time = Instant::now();
371 let mut stats_map = HashMap::new();
372
373 for vnode in vnode_bitmap.iter_vnodes() {
375 let mut stats = VnodeStatistics::new();
376
377 let memcomparable_range_with_vnode = prefixed_range_with_vnode::<Bytes>(.., vnode);
379 let read_options = ReadOptions {
380 cache_policy: CachePolicy::Fill(Hint::Low),
381 ..Default::default()
382 };
383
384 let mut iter = self
385 .state_store
386 .iter(memcomparable_range_with_vnode.clone(), read_options.clone())
387 .await?;
388 if let Some(item) = iter.try_next().await? {
389 let (key_vnode, key_without_vnode) = item.0.user_key.table_key.split_vnode();
390 assert_eq!(vnode, key_vnode);
391 stats.min_key = Some(Bytes::copy_from_slice(key_without_vnode));
392 }
393
394 let mut rev_iter = self
396 .state_store
397 .rev_iter(memcomparable_range_with_vnode, read_options)
398 .await?;
399 if let Some(item) = rev_iter.try_next().await? {
400 let (key_vnode, key_without_vnode) = item.0.user_key.table_key.split_vnode();
401 assert_eq!(vnode, key_vnode);
402 stats.max_key = Some(Bytes::copy_from_slice(key_without_vnode));
403 }
404
405 stats_map.insert(vnode, stats);
406 }
407
408 self.vnode_stats = Some(stats_map);
409
410 if !cfg!(debug_assertions) {
412 info!(
413 table_id = %self.table_id,
414 vnode_count = vnode_bitmap.count_ones(),
415 duration = ?start_time.elapsed(),
416 "finished initializing vnode statistics"
417 );
418 }
419
420 Ok(())
421 }
422
423 async fn may_reload_all_rows(&mut self, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
424 if let Some(rows) = &mut self.all_rows {
425 rows.clear();
426 let start_time = Instant::now();
427 *rows = try_join_all(vnode_bitmap.iter_vnodes().map(|vnode| {
428 let state_store = &self.state_store;
429 let row_serde = &self.row_serde;
430 async move {
431 let mut rows = BTreeMap::new();
432 let memcomparable_range_with_vnode =
433 prefixed_range_with_vnode::<Bytes>(.., vnode);
434 let stream = deserialize_keyed_row_stream::<Bytes>(
436 state_store
437 .iter(
438 memcomparable_range_with_vnode,
439 ReadOptions {
440 prefix_hint: None,
441 prefetch_options: Default::default(),
442 cache_policy: Default::default(),
443 },
444 )
445 .await?,
446 &**row_serde,
447 );
448 pin_mut!(stream);
449 while let Some((encoded_key, row)) = stream.try_next().await? {
450 let key = TableKey(encoded_key);
451 let (iter_vnode, key) = key.split_vnode_bytes();
452 assert_eq!(vnode, iter_vnode);
453 rows.try_insert(key, row).expect("non-duplicated");
454 }
455 Ok((vnode, rows)) as StreamExecutorResult<_>
456 }
457 }))
458 .await?
459 .into_iter()
460 .collect();
461 if !cfg!(debug_assertions) {
463 info!(table_id = %self.table_id, vnode_count = vnode_bitmap.count_ones(), duration = ?start_time.elapsed(),"finished reloading all rows");
464 }
465 }
466 Ok(())
467 }
468
469 async fn init(&mut self, epoch: EpochPair, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
470 self.state_store.init(InitOptions::new(epoch)).await?;
471 self.may_reload_all_rows(vnode_bitmap).await?;
472 self.may_load_vnode_stats(vnode_bitmap).await
473 }
474
475 async fn update_vnode_bitmap(
476 &mut self,
477 vnodes: Arc<Bitmap>,
478 ) -> StreamExecutorResult<Arc<Bitmap>> {
479 let prev_vnodes = self.state_store.update_vnode_bitmap(vnodes.clone()).await?;
480 self.may_reload_all_rows(&vnodes).await?;
481 self.may_load_vnode_stats(&vnodes).await?;
482
483 Ok(prev_vnodes)
484 }
485
486 async fn try_flush(&mut self) -> StreamExecutorResult<()> {
487 self.state_store.try_flush().await?;
488 Ok(())
489 }
490
491 async fn seal_current_epoch(
492 &mut self,
493 next_epoch: u64,
494 table_watermarks: Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)>,
495 switch_consistent_op: Option<StateTableOpConsistencyLevel>,
496 ) -> StreamExecutorResult<()> {
497 if let Some((direction, watermarks, serde_type)) = &table_watermarks
498 && let Some(rows) = &mut self.all_rows
499 {
500 match serde_type {
501 WatermarkSerdeType::PkPrefix => {
502 for vnode_watermark in watermarks {
503 match direction {
504 WatermarkDirection::Ascending => {
505 for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
506 let rows = rows.get_mut(&vnode).expect("covered vnode");
507 *rows = rows.split_off(vnode_watermark.watermark());
509 }
510 }
511 WatermarkDirection::Descending => {
512 let split_off_key = next_key(vnode_watermark.watermark());
514 for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
515 let rows = rows.get_mut(&vnode).expect("covered vnode");
516 rows.split_off(split_off_key.as_slice());
519 }
520 }
521 }
522 }
523 }
524 WatermarkSerdeType::NonPkPrefix => {
525 warn!(table_id = %self.table_id, "table enabled preloading rows got disabled by written non pk prefix watermark");
526 self.all_rows = None;
527 }
528 WatermarkSerdeType::Value => {
529 warn!(table_id = %self.table_id, "table enabled preloading rows got disabled by written value watermark");
530 self.all_rows = None;
531 }
532 }
533 }
534 self.state_store
535 .flush()
536 .instrument(tracing::info_span!("state_table_flush"))
537 .await?;
538 let switch_op_consistency_level =
539 switch_consistent_op.map(|new_consistency_level| match new_consistency_level {
540 StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
541 StateTableOpConsistencyLevel::ConsistentOldValue => {
542 consistent_old_value_op(self.row_serde.clone(), false)
543 }
544 StateTableOpConsistencyLevel::LogStoreEnabled => {
545 consistent_old_value_op(self.row_serde.clone(), true)
546 }
547 });
548 self.state_store.seal_current_epoch(
549 next_epoch,
550 SealCurrentEpochOptions {
551 table_watermarks,
552 switch_op_consistency_level,
553 },
554 );
555 Ok(())
556 }
557}
558
559#[derive(Eq, PartialEq, Copy, Clone, Debug)]
560pub enum StateTableOpConsistencyLevel {
561 Inconsistent,
563 ConsistentOldValue,
567 LogStoreEnabled,
570}
571
572pub struct StateTableBuilder<'a, S, SD, const IS_REPLICATED: bool, PreloadAllRow> {
573 table_catalog: &'a Table,
574 store: S,
575 vnodes: Option<Arc<Bitmap>>,
576 op_consistency_level: Option<StateTableOpConsistencyLevel>,
577 output_column_ids: Option<Vec<ColumnId>>,
578 preload_all_rows: PreloadAllRow,
579 enable_vnode_key_stats: Option<bool>,
580 enable_state_table_vnode_stats_pruning: bool,
583 metrics: Option<StateTableMetrics>,
584
585 _serde: PhantomData<SD>,
586}
587
588impl<'a, S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool>
589 StateTableBuilder<'a, S, SD, IS_REPLICATED, ()>
590{
591 pub fn new(table_catalog: &'a Table, store: S, vnodes: Option<Arc<Bitmap>>) -> Self {
592 Self {
593 table_catalog,
594 store,
595 vnodes,
596 op_consistency_level: None,
597 output_column_ids: None,
598 preload_all_rows: (),
599 enable_vnode_key_stats: None,
600 enable_state_table_vnode_stats_pruning: false,
601 metrics: None,
602 _serde: Default::default(),
603 }
604 }
605
606 fn with_preload_all_rows(
607 self,
608 preload_all_rows: bool,
609 ) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, bool> {
610 StateTableBuilder {
611 table_catalog: self.table_catalog,
612 store: self.store,
613 vnodes: self.vnodes,
614 op_consistency_level: self.op_consistency_level,
615 output_column_ids: self.output_column_ids,
616 preload_all_rows,
617 enable_vnode_key_stats: self.enable_vnode_key_stats,
618 enable_state_table_vnode_stats_pruning: self.enable_state_table_vnode_stats_pruning,
619 metrics: self.metrics,
620 _serde: Default::default(),
621 }
622 }
623
624 pub fn enable_preload_all_rows_by_config(
625 self,
626 config: &StreamingConfig,
627 ) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, bool> {
628 let developer = &config.developer;
629 let preload_all_rows = if developer.default_enable_mem_preload_state_table {
630 !developer
631 .mem_preload_state_table_ids_blacklist
632 .contains(&self.table_catalog.id.as_raw_id())
633 } else {
634 developer
635 .mem_preload_state_table_ids_whitelist
636 .contains(&self.table_catalog.id.as_raw_id())
637 };
638 self.with_preload_all_rows(preload_all_rows)
639 }
640
641 pub fn forbid_preload_all_rows(self) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, bool> {
642 self.with_preload_all_rows(false)
643 }
644}
645
646impl<'a, S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool, PreloadAllRow>
647 StateTableBuilder<'a, S, SD, IS_REPLICATED, PreloadAllRow>
648{
649 pub fn with_op_consistency_level(
650 mut self,
651 op_consistency_level: StateTableOpConsistencyLevel,
652 ) -> Self {
653 self.op_consistency_level = Some(op_consistency_level);
654 self
655 }
656
657 pub fn with_output_column_ids(mut self, output_column_ids: Vec<ColumnId>) -> Self {
658 self.output_column_ids = Some(output_column_ids);
659 self
660 }
661
662 pub fn enable_vnode_key_stats(mut self, enable: bool, config: &StreamingConfig) -> Self {
663 self.enable_vnode_key_stats = Some(enable);
664 self.enable_state_table_vnode_stats_pruning =
665 enable && config.developer.enable_state_table_vnode_stats_pruning;
666 self
667 }
668
669 pub fn with_metrics(mut self, metrics: StateTableMetrics) -> Self {
670 self.metrics = Some(metrics);
671 self
672 }
673}
674
675impl<'a, S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool>
676 StateTableBuilder<'a, S, SD, IS_REPLICATED, bool>
677{
678 pub async fn build(self) -> StateTableInner<S, SD, IS_REPLICATED> {
679 let mut preload_all_rows = self.preload_all_rows;
680 if preload_all_rows
681 && let Err(e) =
682 risingwave_common::license::Feature::StateTableMemoryPreload.check_available()
683 {
684 warn!(table_id=%self.table_catalog.id, e=%e.as_report(), "table configured to preload rows to memory but disabled by license");
685 preload_all_rows = false;
686 }
687
688 let should_enable_vnode_key_stats = if preload_all_rows
689 && let Some(enable_vnode_key_stats) = self.enable_vnode_key_stats
690 && enable_vnode_key_stats
691 {
692 false
693 } else {
694 self.enable_vnode_key_stats.unwrap_or(false)
695 };
696
697 StateTableInner::from_table_catalog_inner(
698 self.table_catalog,
699 self.store,
700 self.vnodes,
701 self.op_consistency_level
702 .unwrap_or(StateTableOpConsistencyLevel::ConsistentOldValue),
703 self.output_column_ids.unwrap_or_default(),
704 preload_all_rows,
705 should_enable_vnode_key_stats,
706 self.enable_state_table_vnode_stats_pruning,
707 self.metrics,
708 )
709 .await
710 }
711}
712
713impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
718where
719 S: StateStore,
720 SD: ValueRowSerde,
721{
722 #[cfg(any(test, feature = "test"))]
726 pub async fn from_table_catalog(
727 table_catalog: &Table,
728 store: S,
729 vnodes: Option<Arc<Bitmap>>,
730 ) -> Self {
731 StateTableBuilder::new(table_catalog, store, vnodes)
732 .forbid_preload_all_rows()
733 .build()
734 .await
735 }
736
737 pub async fn from_table_catalog_inconsistent_op(
739 table_catalog: &Table,
740 store: S,
741 vnodes: Option<Arc<Bitmap>>,
742 ) -> Self {
743 StateTableBuilder::new(table_catalog, store, vnodes)
744 .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
745 .forbid_preload_all_rows()
746 .build()
747 .await
748 }
749
750 #[allow(clippy::too_many_arguments)]
752 async fn from_table_catalog_inner(
753 table_catalog: &Table,
754 store: S,
755 vnodes: Option<Arc<Bitmap>>,
756 op_consistency_level: StateTableOpConsistencyLevel,
757 output_column_ids: Vec<ColumnId>,
758 preload_all_rows: bool,
759 enable_vnode_key_stats: bool,
760 enable_state_table_vnode_stats_pruning: bool,
761 metrics: Option<StateTableMetrics>,
762 ) -> Self {
763 let table_id = table_catalog.id;
764 let table_columns: Vec<ColumnDesc> = table_catalog
765 .columns
766 .iter()
767 .map(|col| col.column_desc.as_ref().unwrap().into())
768 .collect();
769 let data_types: Vec<DataType> = table_catalog
770 .columns
771 .iter()
772 .map(|col| {
773 col.get_column_desc()
774 .unwrap()
775 .get_column_type()
776 .unwrap()
777 .into()
778 })
779 .collect();
780 let order_types: Vec<OrderType> = table_catalog
781 .pk
782 .iter()
783 .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
784 .collect();
785 let dist_key_indices: Vec<usize> = table_catalog
786 .distribution_key
787 .iter()
788 .map(|dist_index| *dist_index as usize)
789 .collect();
790
791 let pk_indices = table_catalog
792 .pk
793 .iter()
794 .map(|col_order| col_order.column_index as usize)
795 .collect_vec();
796
797 let dist_key_in_pk_indices = if table_catalog.get_dist_key_in_pk().is_empty() {
799 get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices).unwrap()
800 } else {
801 table_catalog
802 .get_dist_key_in_pk()
803 .iter()
804 .map(|idx| *idx as usize)
805 .collect()
806 };
807
808 let vnode_col_idx_in_pk = table_catalog.vnode_col_index.as_ref().and_then(|idx| {
809 let vnode_col_idx = *idx as usize;
810 pk_indices.iter().position(|&i| vnode_col_idx == i)
811 });
812
813 let distribution =
814 TableDistribution::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk);
815 assert_eq!(
816 distribution.vnode_count(),
817 table_catalog.vnode_count(),
818 "vnode count mismatch, scanning table {} under wrong distribution?",
819 table_catalog.name,
820 );
821
822 let pk_data_types = pk_indices
823 .iter()
824 .map(|i| table_columns[*i].data_type.clone())
825 .collect();
826 let pk_serde = OrderedRowSerde::new(pk_data_types, order_types);
827
828 let input_value_indices = table_catalog
829 .value_indices
830 .iter()
831 .map(|val| *val as usize)
832 .collect_vec();
833
834 let no_shuffle_value_indices = (0..table_columns.len()).collect_vec();
835
836 let value_indices = match input_value_indices.len() == table_columns.len()
838 && input_value_indices == no_shuffle_value_indices
839 {
840 true => None,
841 false => Some(input_value_indices),
842 };
843 let prefix_hint_len = table_catalog.read_prefix_len_hint as usize;
844
845 let row_serde = Arc::new(SD::new(
846 Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
847 Arc::from(table_columns.clone().into_boxed_slice()),
848 ));
849
850 let state_table_op_consistency_level = op_consistency_level;
851 let op_consistency_level = match op_consistency_level {
852 StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
853 StateTableOpConsistencyLevel::ConsistentOldValue => {
854 consistent_old_value_op(row_serde.clone(), false)
855 }
856 StateTableOpConsistencyLevel::LogStoreEnabled => {
857 consistent_old_value_op(row_serde.clone(), true)
858 }
859 };
860
861 let table_option = TableOption::new(table_catalog.retention_seconds);
862 let new_local_options = if IS_REPLICATED {
863 NewLocalOptions::new_replicated(
864 table_id,
865 table_catalog.fragment_id,
866 op_consistency_level,
867 table_option,
868 distribution.vnodes().clone(),
869 )
870 } else {
871 NewLocalOptions::new(
872 table_id,
873 table_catalog.fragment_id,
874 op_consistency_level,
875 table_option,
876 distribution.vnodes().clone(),
877 true,
878 )
879 };
880 let local_state_store = store.new_local(new_local_options).await;
881
882 assert_eq!(
888 table_catalog.version.is_some(),
889 row_serde.kind().is_column_aware()
890 );
891
892 let output_column_ids_to_input_idx = output_column_ids
894 .iter()
895 .enumerate()
896 .map(|(pos, id)| (*id, pos))
897 .collect::<HashMap<_, _>>();
898
899 let columns: Vec<ColumnDesc> = table_catalog
901 .columns
902 .iter()
903 .map(|c| c.column_desc.as_ref().unwrap().into())
904 .collect_vec();
905
906 let mut i2o_mapping = vec![None; columns.len()];
910 for (i, column) in columns.iter().enumerate() {
911 if let Some(pos) = output_column_ids_to_input_idx.get(&column.column_id) {
912 i2o_mapping[i] = Some(*pos);
913 }
914 }
915 let i2o_mapping = ColIndexMapping::new(i2o_mapping, output_column_ids.len());
917
918 let (_, output_indices) = find_columns_by_ids(&columns[..], &output_column_ids);
920 let clean_watermark_indices = table_catalog.get_clean_watermark_column_indices();
921 if clean_watermark_indices.len() > 1 {
922 unimplemented!("multiple clean watermark columns are not supported yet")
923 }
924 let clean_watermark_index = clean_watermark_indices.first().map(|&i| i as usize);
925 let watermark_serde = clean_watermark_index.map(|idx| {
926 let pk_idx = pk_indices.iter().position(|&i| i == idx);
927 let (watermark_serde, watermark_serde_type) = match pk_idx {
928 Some(0) => (pk_serde.index(0).into_owned(), WatermarkSerdeType::PkPrefix),
929 Some(pk_idx) => (
930 pk_serde.index(pk_idx).into_owned(),
931 WatermarkSerdeType::NonPkPrefix,
932 ),
933 None => (
934 OrderedRowSerde::new(
935 vec![data_types[idx].clone()],
936 vec![OrderType::ascending()],
937 ),
938 WatermarkSerdeType::Value,
939 ),
940 };
941 (watermark_serde, watermark_serde_type)
942 });
943
944 let max_watermark_of_vnodes = distribution
946 .vnodes()
947 .iter_vnodes()
948 .filter_map(|vnode| local_state_store.get_table_watermark(vnode))
949 .max();
950 let committed_watermark = if let Some((deser, _)) = watermark_serde.as_ref()
951 && let Some(max_watermark) = max_watermark_of_vnodes
952 {
953 let deserialized = deser.deserialize(&max_watermark).ok().and_then(|row| {
954 assert!(row.len() == 1);
955 row[0].clone()
956 });
957 if deserialized.is_none() {
958 tracing::error!(
959 vnodes = ?distribution.vnodes(),
960 watermark = ?max_watermark,
961 "Failed to deserialize persisted watermark from state store.",
962 );
963 }
964 deserialized
965 } else {
966 None
967 };
968
969 Self {
970 table_id,
971 row_store: StateTableRowStore {
972 all_rows: preload_all_rows.then(HashMap::new),
973 state_store: local_state_store,
974 row_serde,
975 pk_serde: pk_serde.clone(),
976 table_id,
977 vnode_stats: enable_vnode_key_stats.then(HashMap::new),
979 enable_state_table_vnode_stats_pruning,
980 metrics,
981 },
982 store,
983 epoch: None,
984 pk_serde,
985 pk_indices,
986 distribution,
987 prefix_hint_len,
988 value_indices,
989 pending_watermark: None,
990 committed_watermark,
991 watermark_serde,
992 data_types,
993 output_indices,
994 i2o_mapping,
995 op_consistency_level: state_table_op_consistency_level,
996 clean_watermark_index,
997 on_post_commit: false,
998 }
999 }
1000
1001 pub fn get_data_types(&self) -> &[DataType] {
1002 &self.data_types
1003 }
1004
1005 pub fn table_id(&self) -> TableId {
1006 self.table_id
1007 }
1008
1009 fn compute_prefix_vnode(&self, pk_prefix: &impl Row) -> VirtualNode {
1011 self.distribution
1012 .try_compute_vnode_by_pk_prefix(pk_prefix)
1013 .expect("For streaming, the given prefix must be enough to calculate the vnode")
1014 }
1015
1016 pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode {
1018 self.distribution.compute_vnode_by_pk(pk)
1019 }
1020
1021 pub fn pk_indices(&self) -> &[usize] {
1024 &self.pk_indices
1025 }
1026
1027 pub fn pk_in_output_indices(&self) -> Option<Vec<usize>> {
1031 assert!(IS_REPLICATED);
1032 self.pk_indices
1033 .iter()
1034 .map(|&i| self.output_indices.iter().position(|&j| i == j))
1035 .collect()
1036 }
1037
1038 pub fn pk_serde(&self) -> &OrderedRowSerde {
1039 &self.pk_serde
1040 }
1041
1042 pub fn vnodes(&self) -> &Arc<Bitmap> {
1043 self.distribution.vnodes()
1044 }
1045
1046 pub fn value_indices(&self) -> &Option<Vec<usize>> {
1047 &self.value_indices
1048 }
1049
1050 pub fn is_consistent_op(&self) -> bool {
1051 matches!(
1052 self.op_consistency_level,
1053 StateTableOpConsistencyLevel::ConsistentOldValue
1054 | StateTableOpConsistencyLevel::LogStoreEnabled
1055 )
1056 }
1057
1058 pub fn metrics(&self) -> Option<&StateTableMetrics> {
1059 self.row_store.metrics.as_ref()
1060 }
1061}
1062
1063impl<S, SD> StateTableInner<S, SD, true>
1064where
1065 S: StateStore,
1066 SD: ValueRowSerde,
1067{
1068 pub async fn new_replicated(
1070 table_catalog: &Table,
1071 store: S,
1072 vnodes: Option<Arc<Bitmap>>,
1073 output_column_ids: Vec<ColumnId>,
1074 ) -> Self {
1075 StateTableBuilder::new(table_catalog, store, vnodes)
1078 .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
1079 .with_output_column_ids(output_column_ids)
1080 .forbid_preload_all_rows()
1081 .build()
1082 .await
1083 }
1084}
1085
1086impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1088where
1089 S: StateStore,
1090 SD: ValueRowSerde,
1091{
1092 pub async fn get_row(&self, pk: impl Row) -> StreamExecutorResult<Option<OwnedRow>> {
1094 let (serialized_pk, prefix_hint) = self.serialize_pk_and_get_prefix_hint(&pk);
1095 let row = self.row_store.get(serialized_pk, prefix_hint).await?;
1096 match row {
1097 Some(row) => {
1098 if IS_REPLICATED {
1099 let row = row.project(&self.output_indices);
1102 Ok(Some(row.into_owned_row()))
1103 } else {
1104 Ok(Some(row))
1105 }
1106 }
1107 None => Ok(None),
1108 }
1109 }
1110
1111 pub async fn exists(&self, pk: impl Row) -> StreamExecutorResult<bool> {
1113 let (serialized_pk, prefix_hint) = self.serialize_pk_and_get_prefix_hint(&pk);
1114 self.row_store.exists(serialized_pk, prefix_hint).await
1115 }
1116
1117 fn serialize_pk(&self, pk: &impl Row) -> TableKey<Bytes> {
1118 assert!(pk.len() <= self.pk_indices.len());
1119 serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_vnode_by_pk(pk))
1120 }
1121
1122 fn serialize_pk_and_get_prefix_hint(&self, pk: &impl Row) -> (TableKey<Bytes>, Option<Bytes>) {
1123 let serialized_pk = self.serialize_pk(&pk);
1124 let prefix_hint = if self.prefix_hint_len != 0 && self.prefix_hint_len == pk.len() {
1125 Some(serialized_pk.slice(VirtualNode::SIZE..))
1126 } else {
1127 #[cfg(debug_assertions)]
1128 if self.prefix_hint_len != 0 {
1129 warn!(
1130 "prefix_hint_len is not equal to pk.len(), may not be able to utilize bloom filter"
1131 );
1132 }
1133 None
1134 };
1135 (serialized_pk, prefix_hint)
1136 }
1137}
1138
1139impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1140 async fn get(
1141 &self,
1142 key_bytes: TableKey<Bytes>,
1143 prefix_hint: Option<Bytes>,
1144 ) -> StreamExecutorResult<Option<OwnedRow>> {
1145 if let Some(m) = &self.metrics {
1146 m.get_count.inc();
1147 }
1148 if let Some(rows) = &self.all_rows {
1149 let (vnode, key) = key_bytes.split_vnode_bytes();
1150 return Ok(rows.get(&vnode).expect("covered vnode").get(&key).cloned());
1151 }
1152
1153 let should_prune = if let Some(stats) = &self.vnode_stats
1155 && let (vnode, key) = key_bytes.split_vnode_bytes()
1156 && let Some(vnode_stat) = stats.get(&vnode)
1157 && vnode_stat.can_prune(&key)
1158 {
1159 if let Some(m) = &self.metrics {
1160 m.get_vnode_pruned_count.inc();
1161 }
1162 true
1163 } else {
1164 false
1165 };
1166
1167 if should_prune && self.enable_state_table_vnode_stats_pruning {
1168 return Ok(None);
1169 }
1170
1171 let read_options = ReadOptions {
1172 prefix_hint,
1173 cache_policy: CachePolicy::Fill(Hint::Normal),
1174 ..Default::default()
1175 };
1176
1177 let result = self
1178 .state_store
1179 .on_key_value(key_bytes, read_options, move |_, value| {
1180 let row = self.row_serde.deserialize(value)?;
1181 Ok(OwnedRow::new(row))
1182 })
1183 .await
1184 .map_err(Into::<StreamExecutorError>::into)?;
1185
1186 if should_prune && result.is_some() {
1188 tracing::warn!(
1189 table_id = %self.table_id,
1190 "vnode stats pruning dry run fails for get. This will not affect correctness."
1191 );
1192 }
1193
1194 Ok(result)
1195 }
1196
1197 async fn exists(
1198 &self,
1199 key_bytes: TableKey<Bytes>,
1200 prefix_hint: Option<Bytes>,
1201 ) -> StreamExecutorResult<bool> {
1202 if let Some(m) = &self.metrics {
1203 m.get_count.inc();
1204 }
1205 if let Some(rows) = &self.all_rows {
1206 let (vnode, key) = key_bytes.split_vnode_bytes();
1207 return Ok(rows.get(&vnode).expect("covered vnode").contains_key(&key));
1208 }
1209
1210 let should_prune = if let Some(stats) = &self.vnode_stats
1212 && let (vnode, key) = key_bytes.split_vnode_bytes()
1213 && let Some(vnode_stat) = stats.get(&vnode)
1214 && vnode_stat.can_prune(&key)
1215 {
1216 if let Some(m) = &self.metrics {
1217 m.get_vnode_pruned_count.inc();
1218 }
1219 true
1220 } else {
1221 false
1222 };
1223
1224 if should_prune && self.enable_state_table_vnode_stats_pruning {
1225 return Ok(false);
1226 }
1227
1228 let read_options = ReadOptions {
1229 prefix_hint,
1230 cache_policy: CachePolicy::Fill(Hint::Normal),
1231 ..Default::default()
1232 };
1233 let result = self
1234 .state_store
1235 .on_key_value(key_bytes, read_options, move |_, _| Ok(()))
1236 .await?;
1237 let exists = result.is_some();
1238
1239 if should_prune && exists {
1241 tracing::warn!(
1242 table_id = %self.table_id,
1243 "vnode stats pruning dry run fails for exists. This will not affect correctness."
1244 );
1245 }
1246
1247 Ok(exists)
1248 }
1249}
1250
1251#[must_use]
1266pub struct StateTablePostCommit<'a, S, SD = BasicSerde, const IS_REPLICATED: bool = false>
1267where
1268 S: StateStore,
1269 SD: ValueRowSerde,
1270{
1271 inner: &'a mut StateTableInner<S, SD, IS_REPLICATED>,
1272}
1273
1274impl<'a, S, SD, const IS_REPLICATED: bool> StateTablePostCommit<'a, S, SD, IS_REPLICATED>
1275where
1276 S: StateStore,
1277 SD: ValueRowSerde,
1278{
1279 pub async fn post_yield_barrier(
1284 mut self,
1285 new_vnodes: Option<Arc<Bitmap>>,
1286 ) -> StreamExecutorResult<
1287 Option<(
1288 (
1289 Arc<Bitmap>,
1290 Arc<Bitmap>,
1291 &'a mut StateTableInner<S, SD, IS_REPLICATED>,
1292 ),
1293 bool,
1294 )>,
1295 > {
1296 self.inner.on_post_commit = false;
1297 Ok(if let Some(new_vnodes) = new_vnodes {
1298 let (old_vnodes, keyed_cache_may_stale) =
1299 self.update_vnode_bitmap(new_vnodes.clone()).await?;
1300 Some(((new_vnodes, old_vnodes, self.inner), keyed_cache_may_stale))
1301 } else {
1302 None
1303 })
1304 }
1305
1306 pub fn inner(&self) -> &StateTableInner<S, SD, IS_REPLICATED> {
1307 &*self.inner
1308 }
1309
1310 async fn update_vnode_bitmap(
1312 &mut self,
1313 new_vnodes: Arc<Bitmap>,
1314 ) -> StreamExecutorResult<(Arc<Bitmap>, bool)> {
1315 let prev_vnodes = self
1316 .inner
1317 .row_store
1318 .update_vnode_bitmap(new_vnodes.clone())
1319 .await?;
1320 assert_eq!(
1321 &prev_vnodes,
1322 self.inner.vnodes(),
1323 "state table and state store vnode bitmap mismatches"
1324 );
1325
1326 if self.inner.distribution.is_singleton() {
1327 assert_eq!(
1328 &new_vnodes,
1329 self.inner.vnodes(),
1330 "should not update vnode bitmap for singleton table"
1331 );
1332 }
1333 assert_eq!(self.inner.vnodes().len(), new_vnodes.len());
1334
1335 let keyed_cache_may_stale = keyed_cache_may_stale(self.inner.vnodes(), &new_vnodes);
1336
1337 if keyed_cache_may_stale {
1338 self.inner.pending_watermark = None;
1339 }
1340
1341 Ok((
1342 self.inner.distribution.update_vnode_bitmap(new_vnodes),
1343 keyed_cache_may_stale,
1344 ))
1345 }
1346}
1347
1348impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1350 fn handle_mem_table_error(&self, e: StorageError) {
1351 let e = match e.into_inner() {
1352 ErrorKind::MemTable(e) => e,
1353 _ => unreachable!("should only get memtable error"),
1354 };
1355 match *e {
1356 MemTableError::InconsistentOperation { key, prev, new, .. } => {
1357 let (vnode, key) = deserialize_pk_with_vnode(&key, &self.pk_serde).unwrap();
1358 panic!(
1359 "mem-table operation inconsistent! table_id: {}, vnode: {}, key: {:?}, prev: {}, new: {}",
1360 self.table_id,
1361 vnode,
1362 &key,
1363 prev.debug_fmt(&*self.row_serde),
1364 new.debug_fmt(&*self.row_serde),
1365 )
1366 }
1367 }
1368 }
1369
1370 fn insert(&mut self, key: TableKey<Bytes>, value: impl Row) {
1371 insane_mode_discard_point!();
1372 let value_bytes = self.row_serde.serialize(&value).into();
1373
1374 let (vnode, key_without_vnode) = key.split_vnode_bytes();
1375
1376 if self.all_rows.is_none()
1378 && let Some(stats) = &mut self.vnode_stats
1379 && let Some(vnode_stat) = stats.get_mut(&vnode)
1380 {
1381 vnode_stat.update_with_key(&key_without_vnode);
1382 }
1383
1384 if let Some(rows) = &mut self.all_rows {
1385 rows.get_mut(&vnode)
1386 .expect("covered vnode")
1387 .insert(key_without_vnode, value.into_owned_row());
1388 }
1389 self.state_store
1390 .insert(key, value_bytes, None)
1391 .unwrap_or_else(|e| self.handle_mem_table_error(e));
1392 }
1393
1394 fn delete(&mut self, key: TableKey<Bytes>, value: impl Row) {
1395 insane_mode_discard_point!();
1396 let value_bytes = self.row_serde.serialize(value).into();
1397
1398 let (vnode, key_without_vnode) = key.split_vnode_bytes();
1399
1400 if self.all_rows.is_none()
1401 && let Some(stats) = &mut self.vnode_stats
1402 && let Some(vnode_stat) = stats.get_mut(&vnode)
1403 {
1404 vnode_stat.update_with_key(&key_without_vnode);
1405 }
1406
1407 if let Some(rows) = &mut self.all_rows {
1408 rows.get_mut(&vnode)
1409 .expect("covered vnode")
1410 .remove(&key_without_vnode);
1411 }
1412 self.state_store
1413 .delete(key, value_bytes)
1414 .unwrap_or_else(|e| self.handle_mem_table_error(e));
1415 }
1416
1417 fn update(&mut self, key_bytes: TableKey<Bytes>, old_value: impl Row, new_value: impl Row) {
1418 insane_mode_discard_point!();
1419 let new_value_bytes = self.row_serde.serialize(&new_value).into();
1420 let old_value_bytes = self.row_serde.serialize(old_value).into();
1421
1422 let (vnode, key_without_vnode) = key_bytes.split_vnode_bytes();
1423
1424 if self.all_rows.is_none()
1427 && let Some(stats) = &mut self.vnode_stats
1428 && let Some(vnode_stat) = stats.get_mut(&vnode)
1429 {
1430 vnode_stat.update_with_key(&key_without_vnode);
1431 }
1432
1433 if let Some(rows) = &mut self.all_rows {
1434 rows.get_mut(&vnode)
1435 .expect("covered vnode")
1436 .insert(key_without_vnode, new_value.into_owned_row());
1437 }
1438 self.state_store
1439 .insert(key_bytes, new_value_bytes, Some(old_value_bytes))
1440 .unwrap_or_else(|e| self.handle_mem_table_error(e));
1441 }
1442}
1443
1444impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1445where
1446 S: StateStore,
1447 SD: ValueRowSerde,
1448{
1449 pub fn insert(&mut self, value: impl Row) {
1452 let pk_indices = &self.pk_indices;
1453 let pk = (&value).project(pk_indices);
1454
1455 let key_bytes = self.serialize_pk(&pk);
1456 dispatch_value_indices!(&self.value_indices, [value], {
1457 self.row_store.insert(key_bytes, value)
1458 })
1459 }
1460
1461 pub fn delete(&mut self, old_value: impl Row) {
1464 let pk_indices = &self.pk_indices;
1465 let pk = (&old_value).project(pk_indices);
1466
1467 let key_bytes = self.serialize_pk(&pk);
1468 dispatch_value_indices!(&self.value_indices, [old_value], {
1469 self.row_store.delete(key_bytes, old_value)
1470 })
1471 }
1472
1473 pub fn update(&mut self, old_value: impl Row, new_value: impl Row) {
1475 let old_pk = (&old_value).project(self.pk_indices());
1476 let new_pk = (&new_value).project(self.pk_indices());
1477 debug_assert!(
1478 Row::eq(&old_pk, new_pk),
1479 "pk should not change: {old_pk:?} vs {new_pk:?}. {}",
1480 self.table_id
1481 );
1482
1483 let key_bytes = self.serialize_pk(&new_pk);
1484 dispatch_value_indices!(&self.value_indices, [old_value, new_value], {
1485 self.row_store.update(key_bytes, old_value, new_value)
1486 })
1487 }
1488
1489 pub fn write_record(&mut self, record: Record<impl Row>) {
1491 match record {
1492 Record::Insert { new_row } => self.insert(new_row),
1493 Record::Delete { old_row } => self.delete(old_row),
1494 Record::Update { old_row, new_row } => self.update(old_row, new_row),
1495 }
1496 }
1497
1498 fn fill_non_output_indices(&self, chunk: StreamChunk) -> StreamChunk {
1499 fill_non_output_indices(&self.i2o_mapping, &self.data_types, chunk)
1500 }
1501
1502 #[allow(clippy::disallowed_methods)]
1505 pub fn write_chunk(&mut self, chunk: StreamChunk) {
1506 let chunk = if IS_REPLICATED {
1507 self.fill_non_output_indices(chunk)
1508 } else {
1509 chunk
1510 };
1511
1512 let vnodes = self
1513 .distribution
1514 .compute_chunk_vnode(&chunk, &self.pk_indices);
1515
1516 for (idx, optional_row) in chunk.rows_with_holes().enumerate() {
1517 let Some((op, row)) = optional_row else {
1518 continue;
1519 };
1520 let pk = row.project(&self.pk_indices);
1521 let vnode = vnodes[idx];
1522 let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, vnode);
1523 match op {
1524 Op::Insert | Op::UpdateInsert => {
1525 dispatch_value_indices!(&self.value_indices, [row], {
1526 self.row_store.insert(key_bytes, row);
1527 });
1528 }
1529 Op::Delete | Op::UpdateDelete => {
1530 dispatch_value_indices!(&self.value_indices, [row], {
1531 self.row_store.delete(key_bytes, row);
1532 });
1533 }
1534 }
1535 }
1536 }
1537
1538 pub fn update_watermark(&mut self, watermark: ScalarImpl) {
1544 trace!(table_id = %self.table_id, watermark = ?watermark, "update watermark");
1545 self.pending_watermark = Some(watermark);
1546 }
1547
1548 pub fn get_committed_watermark(&self) -> Option<&ScalarImpl> {
1551 self.committed_watermark.as_ref()
1552 }
1553
1554 pub async fn commit(
1555 &mut self,
1556 new_epoch: EpochPair,
1557 ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED>> {
1558 self.commit_inner(new_epoch, None).await
1559 }
1560
1561 #[cfg(test)]
1562 pub async fn commit_for_test(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
1563 self.commit_assert_no_update_vnode_bitmap(new_epoch).await
1564 }
1565
1566 pub async fn commit_assert_no_update_vnode_bitmap(
1567 &mut self,
1568 new_epoch: EpochPair,
1569 ) -> StreamExecutorResult<()> {
1570 let post_commit = self.commit_inner(new_epoch, None).await?;
1571 post_commit.post_yield_barrier(None).await?;
1572 Ok(())
1573 }
1574
1575 pub async fn commit_may_switch_consistent_op(
1576 &mut self,
1577 new_epoch: EpochPair,
1578 op_consistency_level: StateTableOpConsistencyLevel,
1579 ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED>> {
1580 if self.op_consistency_level != op_consistency_level {
1581 if !cfg!(debug_assertions) {
1583 info!(
1584 ?new_epoch,
1585 prev_op_consistency_level = ?self.op_consistency_level,
1586 ?op_consistency_level,
1587 table_id = %self.table_id,
1588 "switch to new op consistency level"
1589 );
1590 }
1591 self.commit_inner(new_epoch, Some(op_consistency_level))
1592 .await
1593 } else {
1594 self.commit_inner(new_epoch, None).await
1595 }
1596 }
1597
1598 async fn commit_inner(
1599 &mut self,
1600 new_epoch: EpochPair,
1601 switch_consistent_op: Option<StateTableOpConsistencyLevel>,
1602 ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED>> {
1603 assert!(!self.on_post_commit);
1604 assert_eq!(
1605 self.epoch.expect("should only be called after init").curr,
1606 new_epoch.prev
1607 );
1608 if let Some(new_consistency_level) = switch_consistent_op {
1609 assert_ne!(self.op_consistency_level, new_consistency_level);
1610 self.op_consistency_level = new_consistency_level;
1611 }
1612 trace!(
1613 table_id = %self.table_id,
1614 epoch = ?self.epoch,
1615 "commit state table"
1616 );
1617
1618 let table_watermarks = self.commit_pending_watermark();
1619 self.row_store
1620 .seal_current_epoch(new_epoch.curr, table_watermarks, switch_consistent_op)
1621 .await?;
1622 self.epoch = Some(new_epoch);
1623
1624 self.on_post_commit = true;
1625 Ok(StateTablePostCommit { inner: self })
1626 }
1627
1628 fn commit_pending_watermark(
1630 &mut self,
1631 ) -> Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)> {
1632 let watermark = self.pending_watermark.take()?;
1633 trace!(table_id = %self.table_id, watermark = ?watermark, "state cleaning");
1634
1635 assert!(
1636 !self.pk_indices().is_empty(),
1637 "see pending watermark on empty pk"
1638 );
1639 let (watermark_serializer, watermark_type) = self
1640 .watermark_serde
1641 .as_ref()
1642 .expect("watermark serde should be initialized to commit watermark");
1643 let watermark_suffix =
1644 serialize_row(row::once(Some(watermark.clone())), watermark_serializer);
1645 let vnode_watermark = VnodeWatermark::new(
1646 self.vnodes().clone(),
1647 Bytes::copy_from_slice(watermark_suffix.as_ref()),
1648 );
1649 trace!(table_id = %self.table_id, ?vnode_watermark, "table watermark");
1650
1651 let order_type = watermark_serializer.get_order_types().get(0).unwrap();
1652 let direction = if order_type.is_ascending() {
1653 WatermarkDirection::Ascending
1654 } else {
1655 WatermarkDirection::Descending
1656 };
1657
1658 self.committed_watermark = Some(watermark);
1659 Some((direction, vec![vnode_watermark], *watermark_type))
1660 }
1661
1662 pub async fn try_flush(&mut self) -> StreamExecutorResult<()> {
1663 self.row_store.try_flush().await?;
1664 Ok(())
1665 }
1666}
1667
1668pub trait RowStream<'a>: Stream<Item = StreamExecutorResult<OwnedRow>> + 'a {}
1670impl<'a, S: Stream<Item = StreamExecutorResult<OwnedRow>> + 'a> RowStream<'a> for S {}
1671
1672pub trait KeyedRowStream<'a>: Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a {}
1673impl<'a, S: Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a> KeyedRowStream<'a> for S {}
1674
1675pub trait PkRowStream<'a, K>: Stream<Item = StreamExecutorResult<(K, OwnedRow)>> + 'a {}
1676impl<'a, K, S: Stream<Item = StreamExecutorResult<(K, OwnedRow)>> + 'a> PkRowStream<'a, K> for S {}
1677
1678pub type BoxedRowStream<'a> = BoxStream<'a, StreamExecutorResult<OwnedRow>>;
1679
1680pub trait FromVnodeBytes {
1681 fn from_vnode_bytes(vnode: VirtualNode, bytes: &Bytes) -> Self;
1682}
1683
1684impl FromVnodeBytes for Bytes {
1685 fn from_vnode_bytes(vnode: VirtualNode, bytes: &Bytes) -> Self {
1686 prefix_slice_with_vnode(vnode, bytes)
1687 }
1688}
1689
1690impl FromVnodeBytes for () {
1691 fn from_vnode_bytes(_vnode: VirtualNode, _bytes: &Bytes) -> Self {}
1692}
1693
1694impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1696where
1697 S: StateStore,
1698 SD: ValueRowSerde,
1699{
1700 pub async fn iter_with_vnode(
1703 &self,
1704
1705 vnode: VirtualNode,
1709 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1710 prefetch_options: PrefetchOptions,
1711 ) -> StreamExecutorResult<impl RowStream<'_>> {
1712 Ok(self
1713 .iter_kv_with_pk_range::<()>(pk_range, vnode, prefetch_options)
1714 .await?
1715 .map_ok(|(_, row)| row))
1716 }
1717
1718 pub async fn iter_keyed_row_with_vnode(
1719 &self,
1720 vnode: VirtualNode,
1721 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1722 prefetch_options: PrefetchOptions,
1723 ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1724 Ok(self
1725 .iter_kv_with_pk_range(pk_range, vnode, prefetch_options)
1726 .await?
1727 .map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)))
1728 }
1729
1730 pub async fn iter_with_vnode_and_output_indices(
1731 &self,
1732 vnode: VirtualNode,
1733 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1734 prefetch_options: PrefetchOptions,
1735 ) -> StreamExecutorResult<impl RowStream<'_>> {
1736 assert!(IS_REPLICATED);
1737 let stream = self
1738 .iter_with_vnode(vnode, pk_range, prefetch_options)
1739 .await?;
1740 Ok(stream.map(|row| row.map(|row| row.project(&self.output_indices).into_owned_row())))
1741 }
1742}
1743
1744impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1745 async fn iter_kv<K: CopyFromSlice + FromVnodeBytes>(
1750 &self,
1751 vnode: VirtualNode,
1752 (start, end): (Bound<Bytes>, Bound<Bytes>),
1753 prefix_hint: Option<Bytes>,
1754 prefetch_options: PrefetchOptions,
1755 ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1756 if let Some(m) = &self.metrics {
1757 m.iter_count.inc();
1758 }
1759 let (pruned_start, pruned_end, should_prune_entirely) = if let Some(stats) =
1761 &self.vnode_stats
1762 && let Some(vnode_stat) = stats.get(&vnode)
1763 {
1764 match vnode_stat.pruned_key_range(&start, &end) {
1765 Some((new_start, new_end)) => {
1766 if self.enable_state_table_vnode_stats_pruning {
1767 (new_start, new_end, false)
1768 } else {
1769 (start, end, false)
1771 }
1772 }
1773 None => {
1774 if let Some(m) = &self.metrics {
1775 m.iter_vnode_pruned_count.inc();
1776 }
1777 (start.clone(), end.clone(), true)
1779 }
1780 }
1781 } else {
1782 (start, end, false)
1783 };
1784
1785 if should_prune_entirely && self.enable_state_table_vnode_stats_pruning {
1786 return Ok(futures::future::Either::Left(futures::stream::empty()));
1787 }
1788
1789 let table_id = self.table_id;
1790 let inspect_fn = move |result: &StreamExecutorResult<(K, OwnedRow)>| {
1791 if should_prune_entirely && result.is_ok() {
1793 tracing::warn!(
1794 table_id = %table_id,
1795 "vnode stats pruning dry run fails for iter. This will not affect correctness."
1796 );
1797 }
1798 };
1799
1800 if let Some(rows) = &self.all_rows {
1801 return Ok(futures::future::Either::Right(
1802 futures::future::Either::Left(
1803 futures::stream::iter(
1804 rows.get(&vnode)
1805 .expect("covered vnode")
1806 .range((pruned_start, pruned_end))
1807 .map(move |(key, value)| {
1808 Ok((K::from_vnode_bytes(vnode, key), value.clone()))
1809 }),
1810 )
1811 .inspect(inspect_fn),
1812 ),
1813 ));
1814 }
1815 let read_options = ReadOptions {
1816 prefix_hint,
1817 prefetch_options,
1818 cache_policy: CachePolicy::Fill(Hint::Normal),
1819 };
1820
1821 Ok(futures::future::Either::Right(
1822 futures::future::Either::Right(
1823 deserialize_keyed_row_stream(
1824 self.state_store
1825 .iter(
1826 prefixed_range_with_vnode((pruned_start, pruned_end), vnode),
1827 read_options,
1828 )
1829 .await?,
1830 &*self.row_serde,
1831 )
1832 .inspect(inspect_fn),
1833 ),
1834 ))
1835 }
1836
1837 async fn rev_iter_kv<K: CopyFromSlice + FromVnodeBytes>(
1838 &self,
1839 vnode: VirtualNode,
1840 (start, end): (Bound<Bytes>, Bound<Bytes>),
1841 prefix_hint: Option<Bytes>,
1842 prefetch_options: PrefetchOptions,
1843 ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1844 if let Some(m) = &self.metrics {
1845 m.iter_count.inc();
1846 }
1847 let (pruned_start, pruned_end, should_prune_entirely) = if let Some(stats) =
1849 &self.vnode_stats
1850 && let Some(vnode_stat) = stats.get(&vnode)
1851 {
1852 match vnode_stat.pruned_key_range(&start, &end) {
1853 Some((new_start, new_end)) => {
1854 if self.enable_state_table_vnode_stats_pruning {
1855 (new_start, new_end, false)
1856 } else {
1857 (start, end, false)
1859 }
1860 }
1861 None => {
1862 if let Some(m) = &self.metrics {
1863 m.iter_vnode_pruned_count.inc();
1864 }
1865 (start, end, true)
1867 }
1868 }
1869 } else {
1870 (start, end, false)
1871 };
1872
1873 if should_prune_entirely && self.enable_state_table_vnode_stats_pruning {
1874 return Ok(futures::future::Either::Left(futures::stream::empty()));
1875 }
1876
1877 let table_id = self.table_id;
1878 let inspect_fn = move |result: &StreamExecutorResult<(K, OwnedRow)>| {
1879 if should_prune_entirely && result.is_ok() {
1881 tracing::warn!(
1882 table_id = %table_id,
1883 "vnode stats pruning dry run fails for rev_iter. This will not affect correctness."
1884 );
1885 }
1886 };
1887
1888 if let Some(rows) = &self.all_rows {
1889 return Ok(futures::future::Either::Right(
1890 futures::future::Either::Left(
1891 futures::stream::iter(
1892 rows.get(&vnode)
1893 .expect("covered vnode")
1894 .range((pruned_start, pruned_end))
1895 .rev()
1896 .map(move |(key, value)| {
1897 Ok((K::from_vnode_bytes(vnode, key), value.clone()))
1898 }),
1899 )
1900 .inspect(inspect_fn),
1901 ),
1902 ));
1903 }
1904 let read_options = ReadOptions {
1905 prefix_hint,
1906 prefetch_options,
1907 cache_policy: CachePolicy::Fill(Hint::Normal),
1908 };
1909
1910 Ok(futures::future::Either::Right(
1911 futures::future::Either::Right(
1912 deserialize_keyed_row_stream(
1913 self.state_store
1914 .rev_iter(
1915 prefixed_range_with_vnode((pruned_start, pruned_end), vnode),
1916 read_options,
1917 )
1918 .await?,
1919 &*self.row_serde,
1920 )
1921 .inspect(inspect_fn),
1922 ),
1923 ))
1924 }
1925}
1926
1927impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1928where
1929 S: StateStore,
1930 SD: ValueRowSerde,
1931{
1932 pub async fn iter_with_prefix(
1936 &self,
1937 pk_prefix: impl Row,
1938 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1939 prefetch_options: PrefetchOptions,
1940 ) -> StreamExecutorResult<impl RowStream<'_>> {
1941 let stream = self.iter_with_prefix_inner::<false, ()>(pk_prefix, sub_range, prefetch_options)
1942 .await?;
1943 Ok(stream.map_ok(|(_, row)| row))
1944 }
1945
1946 pub async fn iter_with_prefix_respecting_watermark(
1952 &self,
1953 pk_prefix: impl Row,
1954 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1955 prefetch_options: PrefetchOptions,
1956 ) -> StreamExecutorResult<BoxedRowStream<'_>> {
1957 let vnode = self.compute_prefix_vnode(&pk_prefix);
1958 let stream = self
1959 .iter_with_prefix(pk_prefix, sub_range, prefetch_options)
1960 .await?;
1961 let Some(clean_watermark_index) = self.clean_watermark_index else {
1962 return Ok(stream.boxed());
1963 };
1964 let Some((watermark_serde, watermark_type)) = &self.watermark_serde else {
1965 return Err(StreamExecutorError::from(anyhow!(
1966 "Missing watermark serde"
1967 )));
1968 };
1969 if matches!(watermark_type, WatermarkSerdeType::PkPrefix) {
1971 return Ok(stream.boxed());
1972 }
1973 let watermark_bytes = self.row_store.state_store.get_table_watermark(vnode);
1974 let Some(watermark_bytes) = watermark_bytes else {
1975 return Ok(stream.boxed());
1976 };
1977 let watermark_row = watermark_serde.deserialize(&watermark_bytes)?;
1978 if watermark_row.len() != 1 {
1979 return Err(StreamExecutorError::from(format!(
1980 "Watermark row should have exactly 1 column, got {}",
1981 watermark_row.len()
1982 )));
1983 }
1984 let watermark_value = watermark_row[0].clone();
1985 if watermark_value.is_none() {
1987 return Err(StreamExecutorError::from(anyhow!(
1988 "Watermark cannot be NULL"
1989 )));
1990 }
1991 let order_type = watermark_serde.get_order_types().get(0).ok_or_else(|| {
1992 StreamExecutorError::from(anyhow!(
1993 "Watermark serde should have at least one order type"
1994 ))
1995 })?;
1996 let direction = if order_type.is_ascending() {
1997 WatermarkDirection::Ascending
1998 } else {
1999 WatermarkDirection::Descending
2000 };
2001 let stream = stream.try_filter_map(move |row| {
2002 let watermark_col_value = row.datum_at(clean_watermark_index);
2003 let should_filter = direction.datum_filter_by_watermark(
2004 watermark_col_value,
2005 &watermark_value,
2006 *order_type,
2007 );
2008 if should_filter {
2009 ready(Ok(None))
2010 } else {
2011 ready(Ok(Some(row)))
2012 }
2013 });
2014 Ok(stream.boxed())
2015 }
2016
2017 pub async fn get_from_one_row_table(&self) -> StreamExecutorResult<Option<OwnedRow>> {
2019 let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
2020 let stream = self
2021 .iter_with_prefix(row::empty(), sub_range, Default::default())
2022 .await?;
2023 pin_mut!(stream);
2024
2025 if let Some(res) = stream.next().await {
2026 let value = res?.into_owned_row();
2027 assert!(stream.next().await.is_none());
2028 Ok(Some(value))
2029 } else {
2030 Ok(None)
2031 }
2032 }
2033
2034 pub async fn get_from_one_value_table(&self) -> StreamExecutorResult<Option<ScalarImpl>> {
2039 Ok(self
2040 .get_from_one_row_table()
2041 .await?
2042 .and_then(|row| row[0].clone()))
2043 }
2044
2045 pub async fn iter_keyed_row_with_prefix(
2046 &self,
2047 pk_prefix: impl Row,
2048 sub_range: &(Bound<impl Row>, Bound<impl Row>),
2049 prefetch_options: PrefetchOptions,
2050 ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
2051 Ok(
2052 self.iter_with_prefix_inner::<false, Bytes>(pk_prefix, sub_range, prefetch_options)
2053 .await?.map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)),
2054 )
2055 }
2056
2057 pub async fn rev_iter_with_prefix(
2059 &self,
2060 pk_prefix: impl Row,
2061 sub_range: &(Bound<impl Row>, Bound<impl Row>),
2062 prefetch_options: PrefetchOptions,
2063 ) -> StreamExecutorResult<impl RowStream<'_>> {
2064 Ok(
2065 self.iter_with_prefix_inner::<true, ()>(pk_prefix, sub_range, prefetch_options)
2066 .await?.map_ok(|(_, row)| row),
2067 )
2068 }
2069
2070 async fn iter_with_prefix_inner<const REVERSE: bool, K: CopyFromSlice + FromVnodeBytes>(
2071 &self,
2072 pk_prefix: impl Row,
2073 sub_range: &(Bound<impl Row>, Bound<impl Row>),
2074 prefetch_options: PrefetchOptions,
2075 ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
2076 let prefix_serializer = self.pk_serde.prefix(pk_prefix.len());
2077 let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
2078
2079 let vnode = self.compute_prefix_vnode(&pk_prefix);
2083
2084 let pk_prefix_indices = &self.pk_indices[..pk_prefix.len()];
2086 if self.prefix_hint_len != 0 {
2087 debug_assert_eq!(self.prefix_hint_len, pk_prefix.len());
2088 }
2089 let prefix_hint = {
2090 if self.prefix_hint_len == 0 || self.prefix_hint_len > pk_prefix.len() {
2091 None
2092 } else {
2093 let encoded_prefix_len = self
2094 .pk_serde
2095 .deserialize_prefix_len(&encoded_prefix, self.prefix_hint_len)?;
2096
2097 Some(Bytes::copy_from_slice(
2098 &encoded_prefix[..encoded_prefix_len],
2099 ))
2100 }
2101 };
2102
2103 trace!(
2104 table_id = %self.table_id(),
2105 ?prefix_hint, ?pk_prefix,
2106 ?pk_prefix_indices,
2107 iter_direction = if REVERSE { "reverse" } else { "forward" },
2108 "storage_iter_with_prefix"
2109 );
2110
2111 let memcomparable_range =
2112 prefix_and_sub_range_to_memcomparable(&self.pk_serde, sub_range, pk_prefix);
2113
2114 Ok(if REVERSE {
2115 futures::future::Either::Left(
2116 self.row_store
2117 .rev_iter_kv(vnode, memcomparable_range, prefix_hint, prefetch_options)
2118 .await?,
2119 )
2120 } else {
2121 futures::future::Either::Right(
2122 self.row_store
2123 .iter_kv(vnode, memcomparable_range, prefix_hint, prefetch_options)
2124 .await?,
2125 )
2126 })
2127 }
2128
2129 async fn iter_kv_with_pk_range<'a, K: CopyFromSlice + FromVnodeBytes>(
2132 &'a self,
2133 pk_range: &(Bound<impl Row>, Bound<impl Row>),
2134 vnode: VirtualNode,
2138 prefetch_options: PrefetchOptions,
2139 ) -> StreamExecutorResult<impl PkRowStream<'a, K>> {
2140 let memcomparable_range = prefix_range_to_memcomparable(&self.pk_serde, pk_range);
2141
2142 self.row_store
2144 .iter_kv(vnode, memcomparable_range, None, prefetch_options)
2145 .await
2146 }
2147}
2148
2149fn deserialize_keyed_row_stream<'a, K: CopyFromSlice>(
2150 iter: impl StateStoreIter + 'a,
2151 deserializer: &'a impl ValueRowSerde,
2152) -> impl PkRowStream<'a, K> {
2153 iter.into_stream(move |(key, value)| {
2154 Ok((
2155 K::copy_from_slice(key.user_key.table_key.as_ref()),
2156 deserializer.deserialize(value).map(OwnedRow::new)?,
2157 ))
2158 })
2159 .map_err(Into::into)
2160}
2161
2162pub fn prefix_range_to_memcomparable(
2163 pk_serde: &OrderedRowSerde,
2164 range: &(Bound<impl Row>, Bound<impl Row>),
2165) -> (Bound<Bytes>, Bound<Bytes>) {
2166 (
2167 start_range_to_memcomparable(pk_serde, &range.0),
2168 end_range_to_memcomparable(pk_serde, &range.1, None),
2169 )
2170}
2171
2172fn prefix_and_sub_range_to_memcomparable(
2173 pk_serde: &OrderedRowSerde,
2174 sub_range: &(Bound<impl Row>, Bound<impl Row>),
2175 pk_prefix: impl Row,
2176) -> (Bound<Bytes>, Bound<Bytes>) {
2177 let (range_start, range_end) = sub_range;
2178 let prefix_serializer = pk_serde.prefix(pk_prefix.len());
2179 let serialized_pk_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
2180 let start_range = match range_start {
2181 Included(start_range) => Bound::Included(Either::Left((&pk_prefix).chain(start_range))),
2182 Excluded(start_range) => Bound::Excluded(Either::Left((&pk_prefix).chain(start_range))),
2183 Unbounded => Bound::Included(Either::Right(&pk_prefix)),
2184 };
2185 let end_range = match range_end {
2186 Included(end_range) => Bound::Included((&pk_prefix).chain(end_range)),
2187 Excluded(end_range) => Bound::Excluded((&pk_prefix).chain(end_range)),
2188 Unbounded => Unbounded,
2189 };
2190 (
2191 start_range_to_memcomparable(pk_serde, &start_range),
2192 end_range_to_memcomparable(pk_serde, &end_range, Some(serialized_pk_prefix)),
2193 )
2194}
2195
2196fn start_range_to_memcomparable<R: Row>(
2197 pk_serde: &OrderedRowSerde,
2198 bound: &Bound<R>,
2199) -> Bound<Bytes> {
2200 let serialize_pk_prefix = |pk_prefix: &R| {
2201 let prefix_serializer = pk_serde.prefix(pk_prefix.len());
2202 serialize_pk(pk_prefix, &prefix_serializer)
2203 };
2204 match bound {
2205 Unbounded => Unbounded,
2206 Included(r) => {
2207 let serialized = serialize_pk_prefix(r);
2208
2209 Included(serialized)
2210 }
2211 Excluded(r) => {
2212 let serialized = serialize_pk_prefix(r);
2213
2214 start_bound_of_excluded_prefix(&serialized)
2215 }
2216 }
2217}
2218
2219fn end_range_to_memcomparable<R: Row>(
2220 pk_serde: &OrderedRowSerde,
2221 bound: &Bound<R>,
2222 serialized_pk_prefix: Option<Bytes>,
2223) -> Bound<Bytes> {
2224 let serialize_pk_prefix = |pk_prefix: &R| {
2225 let prefix_serializer = pk_serde.prefix(pk_prefix.len());
2226 serialize_pk(pk_prefix, &prefix_serializer)
2227 };
2228 match bound {
2229 Unbounded => match serialized_pk_prefix {
2230 Some(serialized_pk_prefix) => end_bound_of_prefix(&serialized_pk_prefix),
2231 None => Unbounded,
2232 },
2233 Included(r) => {
2234 let serialized = serialize_pk_prefix(r);
2235
2236 end_bound_of_prefix(&serialized)
2237 }
2238 Excluded(r) => {
2239 let serialized = serialize_pk_prefix(r);
2240 Excluded(serialized)
2241 }
2242 }
2243}
2244
2245fn fill_non_output_indices(
2246 i2o_mapping: &ColIndexMapping,
2247 data_types: &[DataType],
2248 chunk: StreamChunk,
2249) -> StreamChunk {
2250 let cardinality = chunk.cardinality();
2251 let (ops, columns, vis) = chunk.into_inner();
2252 let mut full_columns = Vec::with_capacity(data_types.len());
2253 for (i, data_type) in data_types.iter().enumerate() {
2254 if let Some(j) = i2o_mapping.try_map(i) {
2255 full_columns.push(columns[j].clone());
2256 } else {
2257 let mut column_builder = ArrayImplBuilder::with_type(cardinality, data_type.clone());
2258 column_builder.append_n_null(cardinality);
2259 let column: ArrayRef = column_builder.finish().into();
2260 full_columns.push(column)
2261 }
2262 }
2263 let data_chunk = DataChunk::new(full_columns, vis);
2264 StreamChunk::from_parts(ops, data_chunk)
2265}
2266
2267#[cfg(test)]
2268mod tests {
2269 use std::fmt::Debug;
2270
2271 use expect_test::{Expect, expect};
2272
2273 use super::*;
2274
2275 fn check(actual: impl Debug, expect: Expect) {
2276 let actual = format!("{:#?}", actual);
2277 expect.assert_eq(&actual);
2278 }
2279
2280 #[test]
2281 fn test_fill_non_output_indices() {
2282 let data_types = vec![DataType::Int32, DataType::Int32, DataType::Int32];
2283 let replicated_chunk = [OwnedRow::new(vec![
2284 Some(222_i32.into()),
2285 Some(2_i32.into()),
2286 ])];
2287 let replicated_chunk = StreamChunk::from_parts(
2288 vec![Op::Insert],
2289 DataChunk::from_rows(&replicated_chunk, &[DataType::Int32, DataType::Int32]),
2290 );
2291 let i2o_mapping = ColIndexMapping::new(vec![Some(1), None, Some(0)], 2);
2292 let filled_chunk = fill_non_output_indices(&i2o_mapping, &data_types, replicated_chunk);
2293 check(
2294 filled_chunk,
2295 expect![[r#"
2296 StreamChunk { cardinality: 1, capacity: 1, data:
2297 +---+---+---+-----+
2298 | + | 2 | | 222 |
2299 +---+---+---+-----+
2300 }"#]],
2301 );
2302 }
2303}