1use std::collections::{BTreeMap, HashMap};
16use std::marker::PhantomData;
17use std::ops::Bound;
18use std::ops::Bound::*;
19use std::sync::Arc;
20use std::time::Instant;
21
22use anyhow::anyhow;
23use bytes::Bytes;
24use either::Either;
25use foyer::Hint;
26use futures::future::{ready, try_join_all};
27use futures::stream::BoxStream;
28use futures::{Stream, StreamExt, TryStreamExt, pin_mut};
29use itertools::Itertools;
30use risingwave_common::array::stream_record::Record;
31use risingwave_common::array::{ArrayImplBuilder, ArrayRef, DataChunk, Op, StreamChunk};
32use risingwave_common::bitmap::Bitmap;
33use risingwave_common::catalog::{
34 ColumnDesc, ColumnId, TableId, TableOption, get_dist_key_in_pk_indices,
35};
36use risingwave_common::config::StreamingConfig;
37use risingwave_common::hash::{VirtualNode, VnodeBitmapExt, VnodeCountCompat};
38use risingwave_common::id::FragmentId;
39use risingwave_common::row::{self, OwnedRow, Row, RowExt};
40use risingwave_common::types::{DataType, ScalarImpl};
41use risingwave_common::util::column_index_mapping::ColIndexMapping;
42use risingwave_common::util::epoch::EpochPair;
43use risingwave_common::util::row_serde::OrderedRowSerde;
44use risingwave_common::util::sort_util::{OrderType, cmp_datum};
45use risingwave_common::util::value_encoding::BasicSerde;
46use risingwave_hummock_sdk::HummockReadEpoch;
47use risingwave_hummock_sdk::key::{
48 CopyFromSlice, TableKey, end_bound_of_prefix, next_key, prefix_slice_with_vnode,
49 prefixed_range_with_vnode, start_bound_of_excluded_prefix,
50};
51use risingwave_hummock_sdk::table_watermark::{
52 VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
53};
54use risingwave_pb::catalog::Table;
55use risingwave_pb::plan_common::StorageTableDesc;
56use risingwave_storage::StateStore;
57use risingwave_storage::error::{ErrorKind, StorageError, StorageResult};
58use risingwave_storage::hummock::CachePolicy;
59use risingwave_storage::mem_table::MemTableError;
60use risingwave_storage::row_serde::find_columns_by_ids;
61use risingwave_storage::row_serde::row_serde_util::{
62 deserialize_pk_with_vnode, serialize_pk, serialize_pk_with_vnode, serialize_row,
63};
64use risingwave_storage::row_serde::value_serde::ValueRowSerde;
65use risingwave_storage::store::*;
66use risingwave_storage::table::{KeyedRow, TableDistribution, should_calculate_prefix_hint};
67use thiserror_ext::AsReport;
68use tracing::{Instrument, trace};
69
70use crate::cache::keyed_cache_may_stale;
71use crate::executor::monitor::streaming_stats::StateTableMetrics;
72use crate::executor::{StreamExecutorError, StreamExecutorResult};
73
74macro_rules! insane_mode_discard_point {
77 () => {{
78 use rand::Rng;
79 if crate::consistency::insane() && rand::rng().random_bool(0.3) {
80 return;
81 }
82 }};
83}
84
85struct VnodeStatistics {
89 min_key: Option<Bytes>,
90 max_key: Option<Bytes>,
91}
92
93impl VnodeStatistics {
94 fn new() -> Self {
95 Self {
96 min_key: None,
97 max_key: None,
98 }
99 }
100
101 fn update_with_key(&mut self, key: &Bytes) {
102 if let Some(min) = &self.min_key {
103 if key < min {
104 self.min_key = Some(key.clone());
105 }
106 } else {
107 self.min_key = Some(key.clone());
108 }
109
110 if let Some(max) = &self.max_key {
111 if key > max {
112 self.max_key = Some(key.clone());
113 }
114 } else {
115 self.max_key = Some(key.clone());
116 }
117 }
118
119 fn can_prune(&self, key: &Bytes) -> bool {
120 if let Some(min) = &self.min_key
121 && key < min
122 {
123 return true;
124 }
125 if let Some(max) = &self.max_key
126 && key > max
127 {
128 return true;
129 }
130 false
131 }
132
133 fn can_prune_range(&self, start: &Bound<Bytes>, end: &Bound<Bytes>) -> bool {
134 if let Some(max) = &self.max_key {
136 match start {
137 Included(s) if s > max => return true,
138 Excluded(s) if s >= max => return true,
139 _ => {}
140 }
141 }
142 if let Some(min) = &self.min_key {
143 match end {
144 Included(e) if e < min => return true,
145 Excluded(e) if e <= min => return true,
146 _ => {}
147 }
148 }
149 false
150 }
151
152 fn pruned_key_range(
153 &self,
154 start: &Bound<Bytes>,
155 end: &Bound<Bytes>,
156 ) -> Option<(Bound<Bytes>, Bound<Bytes>)> {
157 if self.can_prune_range(start, end) {
158 return None;
159 }
160 let new_start = if let Some(min) = &self.min_key {
161 match start {
162 Included(s) if s <= min => Included(min.clone()),
163 Excluded(s) if s < min => Included(min.clone()),
164 _ => start.clone(),
165 }
166 } else {
167 start.clone()
168 };
169
170 let new_end = if let Some(max) = &self.max_key {
171 match end {
172 Included(e) if e >= max => Included(max.clone()),
173 Excluded(e) if e > max => Included(max.clone()),
174 _ => end.clone(),
175 }
176 } else {
177 end.clone()
178 };
179
180 Some((new_start, new_end))
181 }
182}
183
184pub struct StateTableInner<S, SD = BasicSerde, const IS_REPLICATED: bool = false>
187where
188 S: StateStore,
189 SD: ValueRowSerde,
190{
191 table_id: TableId,
193
194 row_store: StateTableRowStore<S::Local, SD>,
196
197 store: S,
199
200 epoch: Option<EpochPair>,
202
203 pk_serde: OrderedRowSerde,
205
206 pk_indices: Vec<usize>,
210
211 distribution: TableDistribution,
217
218 prefix_hint_len: usize,
219
220 value_indices: Option<Vec<usize>>,
221
222 pub clean_watermark_index: Option<usize>,
224 pending_watermark: Option<ScalarImpl>,
226 committed_watermark: Option<ScalarImpl>,
228 watermark_serde: Option<(OrderedRowSerde, WatermarkSerdeType)>,
230
231 data_types: Vec<DataType>,
234
235 i2o_mapping: ColIndexMapping,
241
242 pub output_indices: Vec<usize>,
247
248 op_consistency_level: StateTableOpConsistencyLevel,
249
250 on_post_commit: bool,
253}
254
255pub type StateTable<S> = StateTableInner<S, BasicSerde>;
257pub type ReplicatedStateTable<S, SD> = StateTableInner<S, SD, true>;
260
261impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
263where
264 S: StateStore,
265 SD: ValueRowSerde,
266{
267 pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
270 self.row_store
271 .init(epoch, self.distribution.vnodes())
272 .await?;
273 assert_eq!(None, self.epoch.replace(epoch), "should not init for twice");
274 Ok(())
275 }
276
277 pub async fn try_wait_committed_epoch(&self, prev_epoch: u64) -> StorageResult<()> {
278 self.store
279 .try_wait_epoch(
280 HummockReadEpoch::Committed(prev_epoch),
281 TryWaitEpochOptions {
282 table_id: self.table_id,
283 },
284 )
285 .await
286 }
287
288 pub fn state_store(&self) -> &S {
289 &self.store
290 }
291}
292
293fn consistent_old_value_op(
294 row_serde: Arc<impl ValueRowSerde>,
295 is_log_store: bool,
296) -> OpConsistencyLevel {
297 OpConsistencyLevel::ConsistentOldValue {
298 check_old_value: Arc::new(move |first: &Bytes, second: &Bytes| {
299 if first == second {
300 return true;
301 }
302 let first = match row_serde.deserialize(first) {
303 Ok(rows) => rows,
304 Err(e) => {
305 error!(error = %e.as_report(), value = ?first, "fail to deserialize serialized value");
306 return false;
307 }
308 };
309 let second = match row_serde.deserialize(second) {
310 Ok(rows) => rows,
311 Err(e) => {
312 error!(error = %e.as_report(), value = ?second, "fail to deserialize serialized value");
313 return false;
314 }
315 };
316 if first != second {
317 error!(first = ?first, second = ?second, "sanity check fail");
318 false
319 } else {
320 true
321 }
322 }),
323 is_log_store,
324 }
325}
326
327macro_rules! dispatch_value_indices {
328 ($value_indices:expr, [$($row_var_name:ident),+], $body:expr) => {
329 if let Some(value_indices) = $value_indices {
330 $(
331 let $row_var_name = $row_var_name.project(value_indices);
332 )+
333 $body
334 } else {
335 $body
336 }
337 };
338}
339
340struct StateTableRowStore<LS: LocalStateStore, SD: ValueRowSerde> {
345 state_store: LS,
346 all_rows: Option<HashMap<VirtualNode, BTreeMap<Bytes, OwnedRow>>>,
347
348 table_id: TableId,
349 row_serde: Arc<SD>,
350 pk_serde: OrderedRowSerde,
352
353 vnode_stats: Option<HashMap<VirtualNode, VnodeStatistics>>,
355 enable_state_table_vnode_stats_pruning: bool,
359 pub metrics: Option<StateTableMetrics>,
361}
362
363impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
364 async fn may_load_vnode_stats(&mut self, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
365 if self.vnode_stats.is_none() {
366 return Ok(());
367 }
368
369 assert!(self.all_rows.is_none());
371
372 let start_time = Instant::now();
373 let mut stats_map = HashMap::new();
374
375 for vnode in vnode_bitmap.iter_vnodes() {
377 let mut stats = VnodeStatistics::new();
378
379 let memcomparable_range_with_vnode = prefixed_range_with_vnode::<Bytes>(.., vnode);
381 let read_options = ReadOptions {
382 cache_policy: CachePolicy::Fill(Hint::Low),
383 ..Default::default()
384 };
385
386 let mut iter = self
387 .state_store
388 .iter(memcomparable_range_with_vnode.clone(), read_options.clone())
389 .await?;
390 if let Some(item) = iter.try_next().await? {
391 let (key_vnode, key_without_vnode) = item.0.user_key.table_key.split_vnode();
392 assert_eq!(vnode, key_vnode);
393 stats.min_key = Some(Bytes::copy_from_slice(key_without_vnode));
394 }
395
396 let mut rev_iter = self
398 .state_store
399 .rev_iter(memcomparable_range_with_vnode, read_options)
400 .await?;
401 if let Some(item) = rev_iter.try_next().await? {
402 let (key_vnode, key_without_vnode) = item.0.user_key.table_key.split_vnode();
403 assert_eq!(vnode, key_vnode);
404 stats.max_key = Some(Bytes::copy_from_slice(key_without_vnode));
405 }
406
407 stats_map.insert(vnode, stats);
408 }
409
410 self.vnode_stats = Some(stats_map);
411
412 if !cfg!(debug_assertions) {
414 info!(
415 table_id = %self.table_id,
416 vnode_count = vnode_bitmap.count_ones(),
417 duration = ?start_time.elapsed(),
418 "finished initializing vnode statistics"
419 );
420 }
421
422 Ok(())
423 }
424
425 async fn may_reload_all_rows(&mut self, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
426 if let Some(rows) = &mut self.all_rows {
427 rows.clear();
428 let start_time = Instant::now();
429 *rows = try_join_all(vnode_bitmap.iter_vnodes().map(|vnode| {
430 let state_store = &self.state_store;
431 let row_serde = &self.row_serde;
432 async move {
433 let mut rows = BTreeMap::new();
434 let memcomparable_range_with_vnode =
435 prefixed_range_with_vnode::<Bytes>(.., vnode);
436 let stream = deserialize_keyed_row_stream::<Bytes>(
438 state_store
439 .iter(
440 memcomparable_range_with_vnode,
441 ReadOptions {
442 prefix_hint: None,
443 prefetch_options: Default::default(),
444 cache_policy: Default::default(),
445 },
446 )
447 .await?,
448 &**row_serde,
449 );
450 pin_mut!(stream);
451 while let Some((encoded_key, row)) = stream.try_next().await? {
452 let key = TableKey(encoded_key);
453 let (iter_vnode, key) = key.split_vnode_bytes();
454 assert_eq!(vnode, iter_vnode);
455 rows.try_insert(key, row).expect("non-duplicated");
456 }
457 Ok((vnode, rows)) as StreamExecutorResult<_>
458 }
459 }))
460 .await?
461 .into_iter()
462 .collect();
463 if !cfg!(debug_assertions) {
465 info!(table_id = %self.table_id, vnode_count = vnode_bitmap.count_ones(), duration = ?start_time.elapsed(),"finished reloading all rows");
466 }
467 }
468 Ok(())
469 }
470
471 async fn init(&mut self, epoch: EpochPair, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
472 self.state_store.init(InitOptions::new(epoch)).await?;
473 self.may_reload_all_rows(vnode_bitmap).await?;
474 self.may_load_vnode_stats(vnode_bitmap).await
475 }
476
477 async fn update_vnode_bitmap(
478 &mut self,
479 vnodes: Arc<Bitmap>,
480 ) -> StreamExecutorResult<Arc<Bitmap>> {
481 let prev_vnodes = self.state_store.update_vnode_bitmap(vnodes.clone()).await?;
482 self.may_reload_all_rows(&vnodes).await?;
483 self.may_load_vnode_stats(&vnodes).await?;
484
485 Ok(prev_vnodes)
486 }
487
488 async fn try_flush(&mut self) -> StreamExecutorResult<()> {
489 self.state_store.try_flush().await?;
490 Ok(())
491 }
492
493 async fn seal_current_epoch(
494 &mut self,
495 next_epoch: u64,
496 table_watermarks: Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)>,
497 switch_consistent_op: Option<StateTableOpConsistencyLevel>,
498 ) -> StreamExecutorResult<()> {
499 if let Some((direction, watermarks, serde_type)) = &table_watermarks
500 && let Some(rows) = &mut self.all_rows
501 {
502 match serde_type {
503 WatermarkSerdeType::PkPrefix => {
504 for vnode_watermark in watermarks {
505 match direction {
506 WatermarkDirection::Ascending => {
507 for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
508 let rows = rows.get_mut(&vnode).expect("covered vnode");
509 *rows = rows.split_off(vnode_watermark.watermark());
511 }
512 }
513 WatermarkDirection::Descending => {
514 let split_off_key = next_key(vnode_watermark.watermark());
516 for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
517 let rows = rows.get_mut(&vnode).expect("covered vnode");
518 rows.split_off(split_off_key.as_slice());
521 }
522 }
523 }
524 }
525 }
526 WatermarkSerdeType::NonPkPrefix => {
527 warn!(table_id = %self.table_id, "table enabled preloading rows got disabled by written non pk prefix watermark");
528 self.all_rows = None;
529 }
530 WatermarkSerdeType::Value => {
531 warn!(table_id = %self.table_id, "table enabled preloading rows got disabled by written value watermark");
532 self.all_rows = None;
533 }
534 }
535 }
536 self.state_store
537 .flush()
538 .instrument(tracing::info_span!("state_table_flush"))
539 .await?;
540 let switch_op_consistency_level =
541 switch_consistent_op.map(|new_consistency_level| match new_consistency_level {
542 StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
543 StateTableOpConsistencyLevel::ConsistentOldValue => {
544 consistent_old_value_op(self.row_serde.clone(), false)
545 }
546 StateTableOpConsistencyLevel::LogStoreEnabled => {
547 consistent_old_value_op(self.row_serde.clone(), true)
548 }
549 });
550 self.state_store.seal_current_epoch(
551 next_epoch,
552 SealCurrentEpochOptions {
553 table_watermarks,
554 switch_op_consistency_level,
555 },
556 );
557 Ok(())
558 }
559}
560
561#[derive(Eq, PartialEq, Copy, Clone, Debug)]
562pub enum StateTableOpConsistencyLevel {
563 Inconsistent,
565 ConsistentOldValue,
569 LogStoreEnabled,
572}
573
574pub struct StateTableBuilder<S, SD, const IS_REPLICATED: bool, PreloadAllRow> {
575 table_id: TableId,
577 table_name_for_debug: String,
578 table_columns: Vec<ColumnDesc>,
579 order_types: Vec<OrderType>,
580 pk_indices: Vec<usize>,
581 dist_key_in_pk_indices: Vec<usize>,
582 vnode_col_idx_in_pk: Option<usize>,
583 expected_vnode_count: usize,
584 value_indices: Vec<usize>,
585 prefix_hint_len: usize,
586 retention_seconds: Option<u32>,
587 versioned: bool,
588 fragment_id: FragmentId,
589 clean_watermark_index: Option<usize>,
590
591 store: S,
593 vnodes: Option<Arc<Bitmap>>,
594 op_consistency_level: Option<StateTableOpConsistencyLevel>,
595 output_column_ids: Option<Vec<ColumnId>>,
596 preload_all_rows: PreloadAllRow,
597 enable_vnode_key_stats: Option<bool>,
598 enable_state_table_vnode_stats_pruning: bool,
601 metrics: Option<StateTableMetrics>,
602
603 _serde: PhantomData<SD>,
604}
605
606impl<S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool>
607 StateTableBuilder<S, SD, IS_REPLICATED, ()>
608{
609 fn with_preload_all_rows(
610 self,
611 preload_all_rows: bool,
612 ) -> StateTableBuilder<S, SD, IS_REPLICATED, bool> {
613 StateTableBuilder {
614 table_id: self.table_id,
615 table_name_for_debug: self.table_name_for_debug,
616 table_columns: self.table_columns,
617 order_types: self.order_types,
618 pk_indices: self.pk_indices,
619 dist_key_in_pk_indices: self.dist_key_in_pk_indices,
620 vnode_col_idx_in_pk: self.vnode_col_idx_in_pk,
621 expected_vnode_count: self.expected_vnode_count,
622 value_indices: self.value_indices,
623 prefix_hint_len: self.prefix_hint_len,
624 retention_seconds: self.retention_seconds,
625 versioned: self.versioned,
626 fragment_id: self.fragment_id,
627 clean_watermark_index: self.clean_watermark_index,
628 store: self.store,
629 vnodes: self.vnodes,
630 op_consistency_level: self.op_consistency_level,
631 output_column_ids: self.output_column_ids,
632 preload_all_rows,
633 enable_vnode_key_stats: self.enable_vnode_key_stats,
634 enable_state_table_vnode_stats_pruning: self.enable_state_table_vnode_stats_pruning,
635 metrics: self.metrics,
636 _serde: Default::default(),
637 }
638 }
639
640 pub fn enable_preload_all_rows_by_config(
641 self,
642 config: &StreamingConfig,
643 ) -> StateTableBuilder<S, SD, IS_REPLICATED, bool> {
644 let developer = &config.developer;
645 let preload_all_rows = if developer.default_enable_mem_preload_state_table {
646 !developer
647 .mem_preload_state_table_ids_blacklist
648 .contains(&self.table_id.as_raw_id())
649 } else {
650 developer
651 .mem_preload_state_table_ids_whitelist
652 .contains(&self.table_id.as_raw_id())
653 };
654 self.with_preload_all_rows(preload_all_rows)
655 }
656
657 pub fn forbid_preload_all_rows(self) -> StateTableBuilder<S, SD, IS_REPLICATED, bool> {
658 self.with_preload_all_rows(false)
659 }
660}
661
662impl<S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool, PreloadAllRow>
663 StateTableBuilder<S, SD, IS_REPLICATED, PreloadAllRow>
664{
665 pub fn with_op_consistency_level(
666 mut self,
667 op_consistency_level: StateTableOpConsistencyLevel,
668 ) -> Self {
669 self.op_consistency_level = Some(op_consistency_level);
670 self
671 }
672
673 pub fn with_output_column_ids(mut self, output_column_ids: Vec<ColumnId>) -> Self {
674 self.output_column_ids = Some(output_column_ids);
675 self
676 }
677
678 pub fn enable_vnode_key_stats(mut self, enable: bool, config: &StreamingConfig) -> Self {
679 self.enable_vnode_key_stats = Some(enable);
680 self.enable_state_table_vnode_stats_pruning =
681 enable && config.developer.enable_state_table_vnode_stats_pruning;
682 self
683 }
684
685 pub fn with_metrics(mut self, metrics: StateTableMetrics) -> Self {
686 self.metrics = Some(metrics);
687 self
688 }
689}
690
691impl<S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool>
692 StateTableBuilder<S, SD, IS_REPLICATED, bool>
693{
694 pub async fn build(self) -> StateTableInner<S, SD, IS_REPLICATED> {
695 let mut preload_all_rows = self.preload_all_rows;
696 if preload_all_rows
697 && let Err(e) =
698 risingwave_common::license::Feature::StateTableMemoryPreload.check_available()
699 {
700 warn!(table_id=%self.table_id, e=%e.as_report(), "table configured to preload rows to memory but disabled by license");
701 preload_all_rows = false;
702 }
703
704 let should_enable_vnode_key_stats = if preload_all_rows
705 && let Some(enable_vnode_key_stats) = self.enable_vnode_key_stats
706 && enable_vnode_key_stats
707 {
708 false
709 } else {
710 self.enable_vnode_key_stats.unwrap_or(false)
711 };
712 self.build_inner(preload_all_rows, should_enable_vnode_key_stats)
713 .await
714 }
715}
716
717impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
722where
723 S: StateStore,
724 SD: ValueRowSerde,
725{
726 #[cfg(any(test, feature = "test"))]
730 pub async fn from_table_catalog(
731 table_catalog: &Table,
732 store: S,
733 vnodes: Option<Arc<Bitmap>>,
734 ) -> Self {
735 StateTableBuilder::new(table_catalog, store, vnodes)
736 .forbid_preload_all_rows()
737 .build()
738 .await
739 }
740
741 pub async fn from_table_catalog_inconsistent_op(
743 table_catalog: &Table,
744 store: S,
745 vnodes: Option<Arc<Bitmap>>,
746 ) -> Self {
747 StateTableBuilder::new(table_catalog, store, vnodes)
748 .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
749 .forbid_preload_all_rows()
750 .build()
751 .await
752 }
753}
754
755impl<S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool>
756 StateTableBuilder<S, SD, IS_REPLICATED, ()>
757{
758 pub fn new(table_catalog: &Table, store: S, vnodes: Option<Arc<Bitmap>>) -> Self {
759 let table_id = table_catalog.id;
760 let table_columns: Vec<ColumnDesc> = table_catalog
761 .columns
762 .iter()
763 .map(|col| col.column_desc.as_ref().unwrap().into())
764 .collect();
765 let order_types: Vec<OrderType> = table_catalog
766 .pk
767 .iter()
768 .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
769 .collect();
770 let dist_key_indices: Vec<usize> = table_catalog
771 .distribution_key
772 .iter()
773 .map(|dist_index| *dist_index as usize)
774 .collect();
775
776 let pk_indices = table_catalog
777 .pk
778 .iter()
779 .map(|col_order| col_order.column_index as usize)
780 .collect_vec();
781
782 let dist_key_in_pk_indices = if table_catalog.get_dist_key_in_pk().is_empty() {
784 get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices).unwrap()
785 } else {
786 table_catalog
787 .get_dist_key_in_pk()
788 .iter()
789 .map(|idx| *idx as usize)
790 .collect()
791 };
792
793 let vnode_col_idx_in_pk = table_catalog.vnode_col_index.as_ref().and_then(|idx| {
794 let vnode_col_idx = *idx as usize;
795 pk_indices.iter().position(|&i| vnode_col_idx == i)
796 });
797 let value_indices = table_catalog
798 .value_indices
799 .iter()
800 .map(|val| *val as usize)
801 .collect_vec();
802 let clean_watermark_indices = table_catalog.get_clean_watermark_column_indices();
803 if clean_watermark_indices.len() > 1 {
804 unimplemented!("multiple clean watermark columns are not supported yet")
805 }
806 let clean_watermark_index = clean_watermark_indices.first().map(|&i| i as usize);
807
808 Self {
809 table_id,
810 table_name_for_debug: table_catalog.name.clone(),
811 table_columns,
812 order_types,
813 pk_indices,
814 dist_key_in_pk_indices,
815 vnode_col_idx_in_pk,
816 expected_vnode_count: table_catalog.vnode_count(),
817 value_indices,
818 prefix_hint_len: table_catalog.read_prefix_len_hint as usize,
819 retention_seconds: table_catalog.retention_seconds,
820 versioned: table_catalog.version.is_some(),
821 fragment_id: table_catalog.fragment_id,
822 clean_watermark_index,
823 store,
824 vnodes,
825 op_consistency_level: None,
826 output_column_ids: None,
827 preload_all_rows: (),
828 enable_vnode_key_stats: None,
829 enable_state_table_vnode_stats_pruning: false,
830 metrics: None,
831 _serde: Default::default(),
832 }
833 }
834}
835
836impl<S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool>
837 StateTableBuilder<S, SD, IS_REPLICATED, bool>
838{
839 async fn build_inner(
840 self,
841 preload_all_rows: bool,
842 should_enable_vnode_key_stats: bool,
843 ) -> StateTableInner<S, SD, IS_REPLICATED> {
844 let table_id = self.table_id;
845 let table_columns = self.table_columns;
846 let order_types = self.order_types;
847 let pk_indices = self.pk_indices;
848 let dist_key_in_pk_indices = self.dist_key_in_pk_indices;
849 let vnode_col_idx_in_pk = self.vnode_col_idx_in_pk;
850 let prefix_hint_len = self.prefix_hint_len;
851 let metrics = self.metrics;
852
853 let op_consistency_level = self
854 .op_consistency_level
855 .unwrap_or(StateTableOpConsistencyLevel::ConsistentOldValue);
856
857 let output_column_ids = self.output_column_ids.unwrap_or_default();
858
859 let data_types: Vec<DataType> = table_columns
860 .iter()
861 .map(|col| col.data_type.clone())
862 .collect();
863
864 if IS_REPLICATED && prefix_hint_len > 0 {
868 assert!(
869 dist_key_in_pk_indices.iter().all(|&d| d < prefix_hint_len),
870 "replicated state table: distribution key indices {:?} must all be covered by \
871 prefix_hint_len {}",
872 dist_key_in_pk_indices,
873 prefix_hint_len,
874 );
875 }
876
877 let distribution =
878 TableDistribution::new(self.vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk);
879 assert_eq!(
880 distribution.vnode_count(),
881 self.expected_vnode_count,
882 "vnode count mismatch, scanning table {} under wrong distribution?",
883 self.table_name_for_debug,
884 );
885
886 let pk_data_types = pk_indices
887 .iter()
888 .map(|i| table_columns[*i].data_type.clone())
889 .collect();
890 let pk_serde = OrderedRowSerde::new(pk_data_types, order_types);
891
892 let input_value_indices = self.value_indices;
893
894 let no_shuffle_value_indices = (0..table_columns.len()).collect_vec();
895
896 let value_indices = match input_value_indices.len() == table_columns.len()
898 && input_value_indices == no_shuffle_value_indices
899 {
900 true => None,
901 false => Some(input_value_indices.clone()),
902 };
903
904 let row_serde = Arc::new(SD::new(
905 Arc::from_iter(input_value_indices.iter().copied()),
906 Arc::from(table_columns.clone().into_boxed_slice()),
907 ));
908
909 let state_table_op_consistency_level = op_consistency_level;
910 let op_consistency_level = match state_table_op_consistency_level {
911 StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
912 StateTableOpConsistencyLevel::ConsistentOldValue => {
913 consistent_old_value_op(row_serde.clone(), false)
914 }
915 StateTableOpConsistencyLevel::LogStoreEnabled => {
916 consistent_old_value_op(row_serde.clone(), true)
917 }
918 };
919
920 let table_option = TableOption::new(self.retention_seconds);
921 let new_local_options = if IS_REPLICATED {
922 NewLocalOptions::new_replicated(
923 table_id,
924 self.fragment_id,
925 op_consistency_level,
926 table_option,
927 distribution.vnodes().clone(),
928 )
929 } else {
930 NewLocalOptions::new(
931 table_id,
932 self.fragment_id,
933 op_consistency_level,
934 table_option,
935 distribution.vnodes().clone(),
936 true,
937 )
938 };
939 let local_state_store = self.store.new_local(new_local_options).await;
940
941 assert_eq!(self.versioned, row_serde.kind().is_column_aware());
947
948 let output_column_ids_to_input_idx = output_column_ids
950 .iter()
951 .enumerate()
952 .map(|(pos, id)| (*id, pos))
953 .collect::<HashMap<_, _>>();
954
955 let columns = table_columns;
956
957 let mut i2o_mapping = vec![None; columns.len()];
961 for (i, column) in columns.iter().enumerate() {
962 if let Some(pos) = output_column_ids_to_input_idx.get(&column.column_id) {
963 i2o_mapping[i] = Some(*pos);
964 }
965 }
966 let i2o_mapping = ColIndexMapping::new(i2o_mapping, output_column_ids.len());
968
969 let (_, output_indices) = find_columns_by_ids(&columns[..], &output_column_ids);
971
972 if IS_REPLICATED {
975 assert!(
976 pk_indices
977 .iter()
978 .all(|&pk_idx| output_indices.contains(&pk_idx)),
979 "all pk columns must be included in output_column_ids for replicated state table"
980 );
981 }
982
983 let clean_watermark_index = self.clean_watermark_index;
984 let watermark_serde = clean_watermark_index.map(|idx| {
985 let pk_idx = pk_indices.iter().position(|&i| i == idx);
986 let (watermark_serde, watermark_serde_type) = match pk_idx {
987 Some(0) => (pk_serde.index(0).into_owned(), WatermarkSerdeType::PkPrefix),
988 Some(pk_idx) => (
989 pk_serde.index(pk_idx).into_owned(),
990 WatermarkSerdeType::NonPkPrefix,
991 ),
992 None => (
993 OrderedRowSerde::new(
994 vec![data_types[idx].clone()],
995 vec![OrderType::ascending()],
996 ),
997 WatermarkSerdeType::Value,
998 ),
999 };
1000 (watermark_serde, watermark_serde_type)
1001 });
1002
1003 let committed_watermark = if let Some((deser, _)) = watermark_serde.as_ref() {
1005 distribution
1006 .vnodes()
1007 .iter_vnodes()
1008 .filter_map(|vnode| {
1009 let bytes = local_state_store.get_table_watermark(vnode)?;
1010 let datum = deser.deserialize(&bytes).ok().and_then(|row| {
1011 assert!(row.len() == 1);
1012 row[0].clone()
1013 });
1014 if datum.is_none() {
1015 tracing::error!(
1016 ?vnode,
1017 watermark = ?bytes,
1018 "Failed to deserialize persisted watermark from state store.",
1019 );
1020 }
1021 datum
1022 })
1023 .max_by(|a, b| cmp_datum(Some(a), Some(b), OrderType::ascending()))
1024 } else {
1025 None
1026 };
1027
1028 StateTableInner {
1029 table_id,
1030 row_store: StateTableRowStore {
1031 all_rows: preload_all_rows.then(HashMap::new),
1032 state_store: local_state_store,
1033 row_serde,
1034 pk_serde: pk_serde.clone(),
1035 table_id,
1036 vnode_stats: should_enable_vnode_key_stats.then(HashMap::new),
1038 enable_state_table_vnode_stats_pruning: self.enable_state_table_vnode_stats_pruning,
1039 metrics,
1040 },
1041 store: self.store,
1042 epoch: None,
1043 pk_serde,
1044 pk_indices,
1045 distribution,
1046 prefix_hint_len,
1047 value_indices,
1048 pending_watermark: None,
1049 committed_watermark,
1050 watermark_serde,
1051 data_types,
1052 output_indices,
1053 i2o_mapping,
1054 op_consistency_level: state_table_op_consistency_level,
1055 clean_watermark_index,
1056 on_post_commit: false,
1057 }
1058 }
1059}
1060
1061impl<S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool>
1062 StateTableBuilder<S, SD, IS_REPLICATED, ()>
1063{
1064 pub fn new_from_storage_table_desc(
1065 table_desc: &StorageTableDesc,
1066 store: S,
1067 vnodes: Option<Arc<Bitmap>>,
1068 fragment_id: u32,
1069 ) -> Self {
1070 let table_id = table_desc.table_id;
1071 let table_columns: Vec<ColumnDesc> =
1072 table_desc.columns.iter().map(ColumnDesc::from).collect();
1073 let order_types: Vec<OrderType> = table_desc
1074 .pk
1075 .iter()
1076 .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
1077 .collect();
1078 let pk_indices = table_desc
1079 .pk
1080 .iter()
1081 .map(|col_order| col_order.column_index as usize)
1082 .collect_vec();
1083 let dist_key_in_pk_indices = table_desc
1084 .dist_key_in_pk_indices
1085 .iter()
1086 .map(|&idx| idx as usize)
1087 .collect();
1088 let vnode_col_idx_in_pk = table_desc.vnode_col_idx_in_pk.map(|k| k as usize);
1091 let raw_value_indices = table_desc
1092 .value_indices
1093 .iter()
1094 .map(|val| *val as usize)
1095 .collect_vec();
1096
1097 Self {
1098 table_id,
1099 table_name_for_debug: table_id.to_string(),
1100 table_columns,
1101 order_types,
1102 pk_indices,
1103 dist_key_in_pk_indices,
1104 vnode_col_idx_in_pk,
1105 expected_vnode_count: table_desc.vnode_count(),
1106 value_indices: raw_value_indices,
1107 prefix_hint_len: table_desc.read_prefix_len_hint as usize,
1108 retention_seconds: table_desc.retention_seconds,
1109 versioned: table_desc.versioned,
1110 fragment_id: fragment_id.into(),
1111 clean_watermark_index: None,
1112 store,
1113 vnodes,
1114 op_consistency_level: None,
1115 output_column_ids: None,
1116 preload_all_rows: (),
1117 enable_vnode_key_stats: None,
1118 enable_state_table_vnode_stats_pruning: false,
1119 metrics: None,
1120 _serde: Default::default(),
1121 }
1122 }
1123}
1124
1125impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1126where
1127 S: StateStore,
1128 SD: ValueRowSerde,
1129{
1130 pub fn get_data_types(&self) -> &[DataType] {
1131 &self.data_types
1132 }
1133
1134 pub fn table_id(&self) -> TableId {
1135 self.table_id
1136 }
1137
1138 fn compute_prefix_vnode(&self, pk_prefix: &impl Row) -> VirtualNode {
1140 self.distribution
1141 .try_compute_vnode_by_pk_prefix(pk_prefix)
1142 .expect("For streaming, the given prefix must be enough to calculate the vnode")
1143 }
1144
1145 pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode {
1147 self.distribution.compute_vnode_by_pk(pk)
1148 }
1149
1150 pub fn pk_indices(&self) -> &[usize] {
1153 &self.pk_indices
1154 }
1155
1156 pub fn pk_in_output_indices(&self) -> Option<Vec<usize>> {
1160 assert!(IS_REPLICATED);
1161 self.pk_indices
1162 .iter()
1163 .map(|&i| self.output_indices.iter().position(|&j| i == j))
1164 .collect()
1165 }
1166
1167 pub fn pk_serde(&self) -> &OrderedRowSerde {
1168 &self.pk_serde
1169 }
1170
1171 pub fn vnodes(&self) -> &Arc<Bitmap> {
1172 self.distribution.vnodes()
1173 }
1174
1175 pub fn value_indices(&self) -> &Option<Vec<usize>> {
1176 &self.value_indices
1177 }
1178
1179 pub fn is_consistent_op(&self) -> bool {
1180 matches!(
1181 self.op_consistency_level,
1182 StateTableOpConsistencyLevel::ConsistentOldValue
1183 | StateTableOpConsistencyLevel::LogStoreEnabled
1184 )
1185 }
1186
1187 pub fn metrics(&self) -> Option<&StateTableMetrics> {
1188 self.row_store.metrics.as_ref()
1189 }
1190}
1191
1192impl<S, SD> StateTableInner<S, SD, true>
1193where
1194 S: StateStore,
1195 SD: ValueRowSerde,
1196{
1197 pub async fn new_replicated(
1199 table_catalog: &Table,
1200 store: S,
1201 vnodes: Option<Arc<Bitmap>>,
1202 output_column_ids: Vec<ColumnId>,
1203 ) -> Self {
1204 StateTableBuilder::new(table_catalog, store, vnodes)
1207 .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
1208 .with_output_column_ids(output_column_ids)
1209 .forbid_preload_all_rows()
1210 .build()
1211 .await
1212 }
1213}
1214
1215impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1217where
1218 S: StateStore,
1219 SD: ValueRowSerde,
1220{
1221 pub async fn get_row(&self, pk: impl Row) -> StreamExecutorResult<Option<OwnedRow>> {
1223 let (serialized_pk, prefix_hint) = self.serialize_pk_and_get_prefix_hint(&pk);
1224 let row = self.row_store.get(serialized_pk, prefix_hint).await?;
1225 match row {
1226 Some(row) => {
1227 if IS_REPLICATED {
1228 let row = row.project(&self.output_indices);
1231 Ok(Some(row.into_owned_row()))
1232 } else {
1233 Ok(Some(row))
1234 }
1235 }
1236 None => Ok(None),
1237 }
1238 }
1239
1240 pub async fn exists(&self, pk: impl Row) -> StreamExecutorResult<bool> {
1242 let (serialized_pk, prefix_hint) = self.serialize_pk_and_get_prefix_hint(&pk);
1243 self.row_store.exists(serialized_pk, prefix_hint).await
1244 }
1245
1246 fn serialize_pk(&self, pk: &impl Row) -> TableKey<Bytes> {
1247 assert!(pk.len() <= self.pk_indices.len());
1248 serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_vnode_by_pk(pk))
1249 }
1250
1251 fn serialize_pk_and_get_prefix_hint(&self, pk: &impl Row) -> (TableKey<Bytes>, Option<Bytes>) {
1252 let serialized_pk = self.serialize_pk(&pk);
1253 let prefix_hint = if should_calculate_prefix_hint(self.prefix_hint_len, pk.len(), false) {
1254 Some(serialized_pk.slice(VirtualNode::SIZE..))
1255 } else {
1256 #[cfg(debug_assertions)]
1257 if self.prefix_hint_len != 0 {
1258 warn!(
1259 "prefix_hint_len is not equal to pk.len(), may not be able to utilize bloom filter"
1260 );
1261 }
1262 None
1263 };
1264 (serialized_pk, prefix_hint)
1265 }
1266}
1267
1268impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1269 async fn get(
1270 &self,
1271 key_bytes: TableKey<Bytes>,
1272 prefix_hint: Option<Bytes>,
1273 ) -> StreamExecutorResult<Option<OwnedRow>> {
1274 if let Some(m) = &self.metrics {
1275 m.get_count.inc();
1276 }
1277 if let Some(rows) = &self.all_rows {
1278 let (vnode, key) = key_bytes.split_vnode_bytes();
1279 return Ok(rows.get(&vnode).expect("covered vnode").get(&key).cloned());
1280 }
1281
1282 let should_prune = if let Some(stats) = &self.vnode_stats
1284 && let (vnode, key) = key_bytes.split_vnode_bytes()
1285 && let Some(vnode_stat) = stats.get(&vnode)
1286 && vnode_stat.can_prune(&key)
1287 {
1288 if let Some(m) = &self.metrics {
1289 m.get_vnode_pruned_count.inc();
1290 }
1291 true
1292 } else {
1293 false
1294 };
1295
1296 if should_prune && self.enable_state_table_vnode_stats_pruning {
1297 return Ok(None);
1298 }
1299
1300 let read_options = ReadOptions {
1301 prefix_hint,
1302 cache_policy: CachePolicy::Fill(Hint::Normal),
1303 ..Default::default()
1304 };
1305
1306 let result = self
1307 .state_store
1308 .on_key_value(key_bytes, read_options, move |_, value| {
1309 let row = self.row_serde.deserialize(value)?;
1310 Ok(OwnedRow::new(row))
1311 })
1312 .await
1313 .map_err(Into::<StreamExecutorError>::into)?;
1314
1315 if should_prune && result.is_some() {
1317 tracing::warn!(
1318 table_id = %self.table_id,
1319 "vnode stats pruning dry run fails for get. This will not affect correctness."
1320 );
1321 }
1322
1323 Ok(result)
1324 }
1325
1326 async fn exists(
1327 &self,
1328 key_bytes: TableKey<Bytes>,
1329 prefix_hint: Option<Bytes>,
1330 ) -> StreamExecutorResult<bool> {
1331 if let Some(m) = &self.metrics {
1332 m.get_count.inc();
1333 }
1334 if let Some(rows) = &self.all_rows {
1335 let (vnode, key) = key_bytes.split_vnode_bytes();
1336 return Ok(rows.get(&vnode).expect("covered vnode").contains_key(&key));
1337 }
1338
1339 let should_prune = if let Some(stats) = &self.vnode_stats
1341 && let (vnode, key) = key_bytes.split_vnode_bytes()
1342 && let Some(vnode_stat) = stats.get(&vnode)
1343 && vnode_stat.can_prune(&key)
1344 {
1345 if let Some(m) = &self.metrics {
1346 m.get_vnode_pruned_count.inc();
1347 }
1348 true
1349 } else {
1350 false
1351 };
1352
1353 if should_prune && self.enable_state_table_vnode_stats_pruning {
1354 return Ok(false);
1355 }
1356
1357 let read_options = ReadOptions {
1358 prefix_hint,
1359 cache_policy: CachePolicy::Fill(Hint::Normal),
1360 ..Default::default()
1361 };
1362 let result = self
1363 .state_store
1364 .on_key_value(key_bytes, read_options, move |_, _| Ok(()))
1365 .await?;
1366 let exists = result.is_some();
1367
1368 if should_prune && exists {
1370 tracing::warn!(
1371 table_id = %self.table_id,
1372 "vnode stats pruning dry run fails for exists. This will not affect correctness."
1373 );
1374 }
1375
1376 Ok(exists)
1377 }
1378}
1379
1380#[must_use]
1395pub struct StateTablePostCommit<'a, S, SD = BasicSerde, const IS_REPLICATED: bool = false>
1396where
1397 S: StateStore,
1398 SD: ValueRowSerde,
1399{
1400 inner: &'a mut StateTableInner<S, SD, IS_REPLICATED>,
1401}
1402
1403impl<'a, S, SD, const IS_REPLICATED: bool> StateTablePostCommit<'a, S, SD, IS_REPLICATED>
1404where
1405 S: StateStore,
1406 SD: ValueRowSerde,
1407{
1408 pub async fn post_yield_barrier(
1413 mut self,
1414 new_vnodes: Option<Arc<Bitmap>>,
1415 ) -> StreamExecutorResult<
1416 Option<(
1417 (
1418 Arc<Bitmap>,
1419 Arc<Bitmap>,
1420 &'a mut StateTableInner<S, SD, IS_REPLICATED>,
1421 ),
1422 bool,
1423 )>,
1424 > {
1425 self.inner.on_post_commit = false;
1426 Ok(if let Some(new_vnodes) = new_vnodes {
1427 let (old_vnodes, keyed_cache_may_stale) =
1428 self.update_vnode_bitmap(new_vnodes.clone()).await?;
1429 Some(((new_vnodes, old_vnodes, self.inner), keyed_cache_may_stale))
1430 } else {
1431 None
1432 })
1433 }
1434
1435 pub fn inner(&self) -> &StateTableInner<S, SD, IS_REPLICATED> {
1436 &*self.inner
1437 }
1438
1439 async fn update_vnode_bitmap(
1441 &mut self,
1442 new_vnodes: Arc<Bitmap>,
1443 ) -> StreamExecutorResult<(Arc<Bitmap>, bool)> {
1444 let prev_vnodes = self
1445 .inner
1446 .row_store
1447 .update_vnode_bitmap(new_vnodes.clone())
1448 .await?;
1449 assert_eq!(
1450 &prev_vnodes,
1451 self.inner.vnodes(),
1452 "state table and state store vnode bitmap mismatches"
1453 );
1454
1455 if self.inner.distribution.is_singleton() {
1456 assert_eq!(
1457 &new_vnodes,
1458 self.inner.vnodes(),
1459 "should not update vnode bitmap for singleton table"
1460 );
1461 }
1462 assert_eq!(self.inner.vnodes().len(), new_vnodes.len());
1463
1464 let keyed_cache_may_stale = keyed_cache_may_stale(self.inner.vnodes(), &new_vnodes);
1465
1466 if keyed_cache_may_stale {
1467 self.inner.pending_watermark = None;
1468 }
1469
1470 Ok((
1471 self.inner.distribution.update_vnode_bitmap(new_vnodes),
1472 keyed_cache_may_stale,
1473 ))
1474 }
1475}
1476
1477impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1479 fn handle_mem_table_error(&self, e: StorageError) {
1480 let e = match e.into_inner() {
1481 ErrorKind::MemTable(e) => e,
1482 _ => unreachable!("should only get memtable error"),
1483 };
1484 match *e {
1485 MemTableError::InconsistentOperation { key, prev, new, .. } => {
1486 let (vnode, key) = deserialize_pk_with_vnode(&key, &self.pk_serde).unwrap();
1487 panic!(
1488 "mem-table operation inconsistent! table_id: {}, vnode: {}, key: {:?}, prev: {}, new: {}",
1489 self.table_id,
1490 vnode,
1491 &key,
1492 prev.debug_fmt(&*self.row_serde),
1493 new.debug_fmt(&*self.row_serde),
1494 )
1495 }
1496 }
1497 }
1498
1499 fn insert(&mut self, key: TableKey<Bytes>, value: impl Row) {
1500 insane_mode_discard_point!();
1501 let value_bytes = self.row_serde.serialize(&value).into();
1502
1503 let (vnode, key_without_vnode) = key.split_vnode_bytes();
1504
1505 if self.all_rows.is_none()
1507 && let Some(stats) = &mut self.vnode_stats
1508 && let Some(vnode_stat) = stats.get_mut(&vnode)
1509 {
1510 vnode_stat.update_with_key(&key_without_vnode);
1511 }
1512
1513 if let Some(rows) = &mut self.all_rows {
1514 rows.get_mut(&vnode)
1515 .expect("covered vnode")
1516 .insert(key_without_vnode, value.into_owned_row());
1517 }
1518 self.state_store
1519 .insert(key, value_bytes, None)
1520 .unwrap_or_else(|e| self.handle_mem_table_error(e));
1521 }
1522
1523 fn delete(&mut self, key: TableKey<Bytes>, value: impl Row) {
1524 insane_mode_discard_point!();
1525 let value_bytes = self.row_serde.serialize(value).into();
1526
1527 let (vnode, key_without_vnode) = key.split_vnode_bytes();
1528
1529 if self.all_rows.is_none()
1530 && let Some(stats) = &mut self.vnode_stats
1531 && let Some(vnode_stat) = stats.get_mut(&vnode)
1532 {
1533 vnode_stat.update_with_key(&key_without_vnode);
1534 }
1535
1536 if let Some(rows) = &mut self.all_rows {
1537 rows.get_mut(&vnode)
1538 .expect("covered vnode")
1539 .remove(&key_without_vnode);
1540 }
1541 self.state_store
1542 .delete(key, value_bytes)
1543 .unwrap_or_else(|e| self.handle_mem_table_error(e));
1544 }
1545
1546 fn update(&mut self, key_bytes: TableKey<Bytes>, old_value: impl Row, new_value: impl Row) {
1547 insane_mode_discard_point!();
1548 let new_value_bytes = self.row_serde.serialize(&new_value).into();
1549 let old_value_bytes = self.row_serde.serialize(old_value).into();
1550
1551 let (vnode, key_without_vnode) = key_bytes.split_vnode_bytes();
1552
1553 if self.all_rows.is_none()
1556 && let Some(stats) = &mut self.vnode_stats
1557 && let Some(vnode_stat) = stats.get_mut(&vnode)
1558 {
1559 vnode_stat.update_with_key(&key_without_vnode);
1560 }
1561
1562 if let Some(rows) = &mut self.all_rows {
1563 rows.get_mut(&vnode)
1564 .expect("covered vnode")
1565 .insert(key_without_vnode, new_value.into_owned_row());
1566 }
1567 self.state_store
1568 .insert(key_bytes, new_value_bytes, Some(old_value_bytes))
1569 .unwrap_or_else(|e| self.handle_mem_table_error(e));
1570 }
1571}
1572
1573impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1574where
1575 S: StateStore,
1576 SD: ValueRowSerde,
1577{
1578 pub fn insert(&mut self, value: impl Row) {
1581 let pk_indices = &self.pk_indices;
1582 let pk = (&value).project(pk_indices);
1583
1584 let key_bytes = self.serialize_pk(&pk);
1585 dispatch_value_indices!(&self.value_indices, [value], {
1586 self.row_store.insert(key_bytes, value)
1587 })
1588 }
1589
1590 pub fn delete(&mut self, old_value: impl Row) {
1593 let pk_indices = &self.pk_indices;
1594 let pk = (&old_value).project(pk_indices);
1595
1596 let key_bytes = self.serialize_pk(&pk);
1597 dispatch_value_indices!(&self.value_indices, [old_value], {
1598 self.row_store.delete(key_bytes, old_value)
1599 })
1600 }
1601
1602 pub fn update(&mut self, old_value: impl Row, new_value: impl Row) {
1604 let old_pk = (&old_value).project(self.pk_indices());
1605 let new_pk = (&new_value).project(self.pk_indices());
1606 debug_assert!(
1607 Row::eq(&old_pk, new_pk),
1608 "pk should not change: {old_pk:?} vs {new_pk:?}. {}",
1609 self.table_id
1610 );
1611
1612 let key_bytes = self.serialize_pk(&new_pk);
1613 dispatch_value_indices!(&self.value_indices, [old_value, new_value], {
1614 self.row_store.update(key_bytes, old_value, new_value)
1615 })
1616 }
1617
1618 pub fn write_record(&mut self, record: Record<impl Row>) {
1620 match record {
1621 Record::Insert { new_row } => self.insert(new_row),
1622 Record::Delete { old_row } => self.delete(old_row),
1623 Record::Update { old_row, new_row } => self.update(old_row, new_row),
1624 }
1625 }
1626
1627 fn fill_non_output_indices(&self, chunk: StreamChunk) -> StreamChunk {
1628 fill_non_output_indices(&self.i2o_mapping, &self.data_types, chunk)
1629 }
1630
1631 #[allow(clippy::disallowed_methods)]
1634 pub fn write_chunk(&mut self, chunk: StreamChunk) {
1635 let chunk = if IS_REPLICATED {
1636 self.fill_non_output_indices(chunk)
1637 } else {
1638 chunk
1639 };
1640
1641 let vnodes = self
1642 .distribution
1643 .compute_chunk_vnode(&chunk, &self.pk_indices);
1644
1645 for (idx, optional_row) in chunk.rows_with_holes().enumerate() {
1646 let Some((op, row)) = optional_row else {
1647 continue;
1648 };
1649 let pk = row.project(&self.pk_indices);
1650 let vnode = vnodes[idx];
1651 let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, vnode);
1652 match op {
1653 Op::Insert | Op::UpdateInsert => {
1654 dispatch_value_indices!(&self.value_indices, [row], {
1655 self.row_store.insert(key_bytes, row);
1656 });
1657 }
1658 Op::Delete | Op::UpdateDelete => {
1659 dispatch_value_indices!(&self.value_indices, [row], {
1660 self.row_store.delete(key_bytes, row);
1661 });
1662 }
1663 }
1664 }
1665 }
1666
1667 pub fn update_watermark(&mut self, watermark: ScalarImpl) {
1673 trace!(table_id = %self.table_id, watermark = ?watermark, "update watermark");
1674 self.pending_watermark = Some(watermark);
1675 }
1676
1677 pub fn get_committed_watermark(&self) -> Option<&ScalarImpl> {
1680 self.committed_watermark.as_ref()
1681 }
1682
1683 pub async fn commit(
1684 &mut self,
1685 new_epoch: EpochPair,
1686 ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED>> {
1687 self.commit_inner(new_epoch, None).await
1688 }
1689
1690 #[cfg(test)]
1691 pub async fn commit_for_test(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
1692 self.commit_assert_no_update_vnode_bitmap(new_epoch).await
1693 }
1694
1695 pub async fn commit_assert_no_update_vnode_bitmap(
1696 &mut self,
1697 new_epoch: EpochPair,
1698 ) -> StreamExecutorResult<()> {
1699 let post_commit = self.commit_inner(new_epoch, None).await?;
1700 post_commit.post_yield_barrier(None).await?;
1701 Ok(())
1702 }
1703
1704 pub async fn commit_may_switch_consistent_op(
1705 &mut self,
1706 new_epoch: EpochPair,
1707 op_consistency_level: StateTableOpConsistencyLevel,
1708 ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED>> {
1709 if self.op_consistency_level != op_consistency_level {
1710 if !cfg!(debug_assertions) {
1712 info!(
1713 ?new_epoch,
1714 prev_op_consistency_level = ?self.op_consistency_level,
1715 ?op_consistency_level,
1716 table_id = %self.table_id,
1717 "switch to new op consistency level"
1718 );
1719 }
1720 self.commit_inner(new_epoch, Some(op_consistency_level))
1721 .await
1722 } else {
1723 self.commit_inner(new_epoch, None).await
1724 }
1725 }
1726
1727 async fn commit_inner(
1728 &mut self,
1729 new_epoch: EpochPair,
1730 switch_consistent_op: Option<StateTableOpConsistencyLevel>,
1731 ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED>> {
1732 assert!(!self.on_post_commit);
1733 assert_eq!(
1734 self.epoch.expect("should only be called after init").curr,
1735 new_epoch.prev
1736 );
1737 if let Some(new_consistency_level) = switch_consistent_op {
1738 assert_ne!(self.op_consistency_level, new_consistency_level);
1739 self.op_consistency_level = new_consistency_level;
1740 }
1741 trace!(
1742 table_id = %self.table_id,
1743 epoch = ?self.epoch,
1744 "commit state table"
1745 );
1746
1747 let table_watermarks = self.commit_pending_watermark();
1748 self.row_store
1749 .seal_current_epoch(new_epoch.curr, table_watermarks, switch_consistent_op)
1750 .await?;
1751 self.epoch = Some(new_epoch);
1752
1753 self.on_post_commit = true;
1754 Ok(StateTablePostCommit { inner: self })
1755 }
1756
1757 fn commit_pending_watermark(
1759 &mut self,
1760 ) -> Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)> {
1761 let watermark = self.pending_watermark.take()?;
1762 trace!(table_id = %self.table_id, watermark = ?watermark, "state cleaning");
1763
1764 assert!(
1765 !self.pk_indices().is_empty(),
1766 "see pending watermark on empty pk"
1767 );
1768 let (watermark_serializer, watermark_type) = self
1769 .watermark_serde
1770 .as_ref()
1771 .expect("watermark serde should be initialized to commit watermark");
1772 let watermark_suffix =
1773 serialize_row(row::once(Some(watermark.clone())), watermark_serializer);
1774 let vnode_watermark = VnodeWatermark::new(
1775 self.vnodes().clone(),
1776 Bytes::copy_from_slice(watermark_suffix.as_ref()),
1777 );
1778 trace!(table_id = %self.table_id, ?vnode_watermark, "table watermark");
1779
1780 let order_type = watermark_serializer.get_order_types().get(0).unwrap();
1781 let direction = if order_type.is_ascending() {
1782 WatermarkDirection::Ascending
1783 } else {
1784 WatermarkDirection::Descending
1785 };
1786
1787 self.committed_watermark = Some(watermark);
1788 Some((direction, vec![vnode_watermark], *watermark_type))
1789 }
1790
1791 pub async fn try_flush(&mut self) -> StreamExecutorResult<()> {
1792 self.row_store.try_flush().await?;
1793 Ok(())
1794 }
1795}
1796
1797pub trait RowStream<'a>: Stream<Item = StreamExecutorResult<OwnedRow>> + 'a {}
1799impl<'a, S: Stream<Item = StreamExecutorResult<OwnedRow>> + 'a> RowStream<'a> for S {}
1800
1801pub trait KeyedRowStream<'a>: Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a {}
1802impl<'a, S: Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a> KeyedRowStream<'a> for S {}
1803
1804pub trait PkRowStream<'a, K>: Stream<Item = StreamExecutorResult<(K, OwnedRow)>> + 'a {}
1805impl<'a, K, S: Stream<Item = StreamExecutorResult<(K, OwnedRow)>> + 'a> PkRowStream<'a, K> for S {}
1806
1807pub type BoxedRowStream<'a> = BoxStream<'a, StreamExecutorResult<OwnedRow>>;
1808
1809pub trait FromVnodeBytes {
1810 fn from_vnode_bytes(vnode: VirtualNode, bytes: &Bytes) -> Self;
1811}
1812
1813impl FromVnodeBytes for Bytes {
1814 fn from_vnode_bytes(vnode: VirtualNode, bytes: &Bytes) -> Self {
1815 prefix_slice_with_vnode(vnode, bytes)
1816 }
1817}
1818
1819impl FromVnodeBytes for () {
1820 fn from_vnode_bytes(_vnode: VirtualNode, _bytes: &Bytes) -> Self {}
1821}
1822
1823impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1825where
1826 S: StateStore,
1827 SD: ValueRowSerde,
1828{
1829 pub async fn iter_with_vnode(
1832 &self,
1833
1834 vnode: VirtualNode,
1838 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1839 prefetch_options: PrefetchOptions,
1840 ) -> StreamExecutorResult<impl RowStream<'_>> {
1841 Ok(self
1842 .iter_kv_with_pk_range::<()>(pk_range, vnode, prefetch_options)
1843 .await?
1844 .map_ok(|(_, row)| row))
1845 }
1846
1847 pub async fn iter_keyed_row_with_vnode(
1848 &self,
1849 vnode: VirtualNode,
1850 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1851 prefetch_options: PrefetchOptions,
1852 ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1853 Ok(self
1854 .iter_kv_with_pk_range(pk_range, vnode, prefetch_options)
1855 .await?
1856 .map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)))
1857 }
1858
1859 pub async fn iter_with_vnode_and_output_indices(
1860 &self,
1861 vnode: VirtualNode,
1862 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1863 prefetch_options: PrefetchOptions,
1864 ) -> StreamExecutorResult<impl RowStream<'_>> {
1865 assert!(IS_REPLICATED);
1866 let stream = self
1867 .iter_with_vnode(vnode, pk_range, prefetch_options)
1868 .await?;
1869 Ok(stream.map(|row| row.map(|row| row.project(&self.output_indices).into_owned_row())))
1870 }
1871}
1872
1873impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1874 async fn iter_kv<K: CopyFromSlice + FromVnodeBytes>(
1879 &self,
1880 vnode: VirtualNode,
1881 (start, end): (Bound<Bytes>, Bound<Bytes>),
1882 prefix_hint: Option<Bytes>,
1883 prefetch_options: PrefetchOptions,
1884 ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1885 if let Some(m) = &self.metrics {
1886 m.iter_count.inc();
1887 }
1888 let (pruned_start, pruned_end, should_prune_entirely) = if let Some(stats) =
1890 &self.vnode_stats
1891 && let Some(vnode_stat) = stats.get(&vnode)
1892 {
1893 match vnode_stat.pruned_key_range(&start, &end) {
1894 Some((new_start, new_end)) => {
1895 if self.enable_state_table_vnode_stats_pruning {
1896 (new_start, new_end, false)
1897 } else {
1898 (start, end, false)
1900 }
1901 }
1902 None => {
1903 if let Some(m) = &self.metrics {
1904 m.iter_vnode_pruned_count.inc();
1905 }
1906 (start.clone(), end.clone(), true)
1908 }
1909 }
1910 } else {
1911 (start, end, false)
1912 };
1913
1914 if should_prune_entirely && self.enable_state_table_vnode_stats_pruning {
1915 return Ok(futures::future::Either::Left(futures::stream::empty()));
1916 }
1917
1918 let table_id = self.table_id;
1919 let inspect_fn = move |result: &StreamExecutorResult<(K, OwnedRow)>| {
1920 if should_prune_entirely && result.is_ok() {
1922 tracing::warn!(
1923 table_id = %table_id,
1924 "vnode stats pruning dry run fails for iter. This will not affect correctness."
1925 );
1926 }
1927 };
1928
1929 if let Some(rows) = &self.all_rows {
1930 return Ok(futures::future::Either::Right(
1931 futures::future::Either::Left(
1932 futures::stream::iter(
1933 rows.get(&vnode)
1934 .expect("covered vnode")
1935 .range((pruned_start, pruned_end))
1936 .map(move |(key, value)| {
1937 Ok((K::from_vnode_bytes(vnode, key), value.clone()))
1938 }),
1939 )
1940 .inspect(inspect_fn),
1941 ),
1942 ));
1943 }
1944 let read_options = ReadOptions {
1945 prefix_hint,
1946 prefetch_options,
1947 cache_policy: CachePolicy::Fill(Hint::Normal),
1948 };
1949
1950 Ok(futures::future::Either::Right(
1951 futures::future::Either::Right(
1952 deserialize_keyed_row_stream(
1953 self.state_store
1954 .iter(
1955 prefixed_range_with_vnode((pruned_start, pruned_end), vnode),
1956 read_options,
1957 )
1958 .await?,
1959 &*self.row_serde,
1960 )
1961 .inspect(inspect_fn),
1962 ),
1963 ))
1964 }
1965
1966 async fn rev_iter_kv<K: CopyFromSlice + FromVnodeBytes>(
1967 &self,
1968 vnode: VirtualNode,
1969 (start, end): (Bound<Bytes>, Bound<Bytes>),
1970 prefix_hint: Option<Bytes>,
1971 prefetch_options: PrefetchOptions,
1972 ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1973 if let Some(m) = &self.metrics {
1974 m.iter_count.inc();
1975 }
1976 let (pruned_start, pruned_end, should_prune_entirely) = if let Some(stats) =
1978 &self.vnode_stats
1979 && let Some(vnode_stat) = stats.get(&vnode)
1980 {
1981 match vnode_stat.pruned_key_range(&start, &end) {
1982 Some((new_start, new_end)) => {
1983 if self.enable_state_table_vnode_stats_pruning {
1984 (new_start, new_end, false)
1985 } else {
1986 (start, end, false)
1988 }
1989 }
1990 None => {
1991 if let Some(m) = &self.metrics {
1992 m.iter_vnode_pruned_count.inc();
1993 }
1994 (start, end, true)
1996 }
1997 }
1998 } else {
1999 (start, end, false)
2000 };
2001
2002 if should_prune_entirely && self.enable_state_table_vnode_stats_pruning {
2003 return Ok(futures::future::Either::Left(futures::stream::empty()));
2004 }
2005
2006 let table_id = self.table_id;
2007 let inspect_fn = move |result: &StreamExecutorResult<(K, OwnedRow)>| {
2008 if should_prune_entirely && result.is_ok() {
2010 tracing::warn!(
2011 table_id = %table_id,
2012 "vnode stats pruning dry run fails for rev_iter. This will not affect correctness."
2013 );
2014 }
2015 };
2016
2017 if let Some(rows) = &self.all_rows {
2018 return Ok(futures::future::Either::Right(
2019 futures::future::Either::Left(
2020 futures::stream::iter(
2021 rows.get(&vnode)
2022 .expect("covered vnode")
2023 .range((pruned_start, pruned_end))
2024 .rev()
2025 .map(move |(key, value)| {
2026 Ok((K::from_vnode_bytes(vnode, key), value.clone()))
2027 }),
2028 )
2029 .inspect(inspect_fn),
2030 ),
2031 ));
2032 }
2033 let read_options = ReadOptions {
2034 prefix_hint,
2035 prefetch_options,
2036 cache_policy: CachePolicy::Fill(Hint::Normal),
2037 };
2038
2039 Ok(futures::future::Either::Right(
2040 futures::future::Either::Right(
2041 deserialize_keyed_row_stream(
2042 self.state_store
2043 .rev_iter(
2044 prefixed_range_with_vnode((pruned_start, pruned_end), vnode),
2045 read_options,
2046 )
2047 .await?,
2048 &*self.row_serde,
2049 )
2050 .inspect(inspect_fn),
2051 ),
2052 ))
2053 }
2054}
2055
2056impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
2057where
2058 S: StateStore,
2059 SD: ValueRowSerde,
2060{
2061 pub async fn iter_with_prefix(
2065 &self,
2066 pk_prefix: impl Row,
2067 sub_range: &(Bound<impl Row>, Bound<impl Row>),
2068 prefetch_options: PrefetchOptions,
2069 ) -> StreamExecutorResult<impl RowStream<'_>> {
2070 let stream = self.iter_with_prefix_inner::<false, ()>(pk_prefix, sub_range, prefetch_options)
2071 .await?;
2072 Ok(stream.map_ok(|(_, row)| row))
2073 }
2074
2075 pub async fn iter_with_prefix_respecting_watermark(
2081 &self,
2082 pk_prefix: impl Row,
2083 sub_range: &(Bound<impl Row>, Bound<impl Row>),
2084 prefetch_options: PrefetchOptions,
2085 ) -> StreamExecutorResult<BoxedRowStream<'_>> {
2086 let vnode = self.compute_prefix_vnode(&pk_prefix);
2087 let Some(clean_watermark_index) = self.clean_watermark_index else {
2088 return self
2089 .iter_with_prefix(pk_prefix, sub_range, prefetch_options)
2090 .await
2091 .map(|s| s.boxed());
2092 };
2093 let Some((watermark_serde, watermark_type)) = &self.watermark_serde else {
2094 return Err(StreamExecutorError::from(anyhow!(
2095 "Missing watermark serde"
2096 )));
2097 };
2098 if matches!(watermark_type, WatermarkSerdeType::PkPrefix) {
2100 return self
2101 .iter_with_prefix(pk_prefix, sub_range, prefetch_options)
2102 .await
2103 .map(|s| s.boxed());
2104 }
2105
2106 let watermark_bytes = self.row_store.state_store.get_table_watermark(vnode);
2107 let Some(watermark_bytes) = watermark_bytes else {
2108 return self
2109 .iter_with_prefix(pk_prefix, sub_range, prefetch_options)
2110 .await
2111 .map(|s| s.boxed());
2112 };
2113 let watermark_row = watermark_serde.deserialize(&watermark_bytes)?;
2114 if watermark_row.len() != 1 {
2115 return Err(StreamExecutorError::from(format!(
2116 "Watermark row should have exactly 1 column, got {}",
2117 watermark_row.len()
2118 )));
2119 }
2120 let watermark_value = watermark_row[0].clone();
2121 if watermark_value.is_none() {
2123 return Err(StreamExecutorError::from(anyhow!(
2124 "Watermark cannot be NULL"
2125 )));
2126 }
2127 let order_type = watermark_serde.get_order_types().get(0).ok_or_else(|| {
2128 StreamExecutorError::from(anyhow!(
2129 "Watermark serde should have at least one order type"
2130 ))
2131 })?;
2132
2133 let direction = if order_type.is_ascending() {
2134 WatermarkDirection::Ascending
2135 } else {
2136 WatermarkDirection::Descending
2137 };
2138 let clean_watermark_index_in_pk = self
2139 .pk_indices
2140 .iter()
2141 .position(|&i| i == clean_watermark_index);
2142 let clean_watermark_index_in_value = match &self.value_indices {
2143 Some(value_indices) => value_indices
2144 .iter()
2145 .position(|idx| *idx == clean_watermark_index)
2146 .ok_or_else(|| {
2147 StreamExecutorError::from(anyhow!(
2148 "clean watermark column index {} is not included in table value indices {:?}",
2149 clean_watermark_index,
2150 value_indices
2151 ))
2152 })?,
2153 None => clean_watermark_index,
2154 };
2155
2156 let stream = self
2157 .iter_with_prefix_inner::<false, Bytes>(pk_prefix, sub_range, prefetch_options)
2158 .await?
2159 .try_filter_map(move |(pk, row)| {
2160 let should_filter = match watermark_type {
2161 WatermarkSerdeType::PkPrefix => unreachable!(),
2162 WatermarkSerdeType::NonPkPrefix => {
2163 let table_key = TableKey(pk);
2164 let (vnode, key) = table_key.split_vnode();
2165 let pk_cols = self.pk_serde
2166 .deserialize(key)
2167 .unwrap_or_else(|e| {
2168 panic!("Failed to deserialize table {} vnode {:?} key {:?} error: {:?}", self.table_id(), vnode, key, e.as_report());
2169 });
2170 direction.datum_filter_by_watermark(
2171 pk_cols.datum_at(clean_watermark_index_in_pk.unwrap()),
2172 &watermark_value,
2173 *order_type,
2174 )
2175 },
2176 WatermarkSerdeType::Value => {
2177 direction.datum_filter_by_watermark(
2178 row.datum_at(clean_watermark_index_in_value),
2179 &watermark_value,
2180 *order_type,
2181 )
2182 }
2183 };
2184 if should_filter {
2185 ready(Ok(None))
2186 } else {
2187 ready(Ok(Some(row)))
2188 }
2189 });
2190 Ok(stream.boxed())
2191 }
2192
2193 pub async fn get_from_one_row_table(&self) -> StreamExecutorResult<Option<OwnedRow>> {
2195 let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
2196 let stream = self
2197 .iter_with_prefix(row::empty(), sub_range, Default::default())
2198 .await?;
2199 pin_mut!(stream);
2200
2201 if let Some(res) = stream.next().await {
2202 let value = res?.into_owned_row();
2203 assert!(stream.next().await.is_none());
2204 Ok(Some(value))
2205 } else {
2206 Ok(None)
2207 }
2208 }
2209
2210 pub async fn get_from_one_value_table(&self) -> StreamExecutorResult<Option<ScalarImpl>> {
2215 Ok(self
2216 .get_from_one_row_table()
2217 .await?
2218 .and_then(|row| row[0].clone()))
2219 }
2220
2221 pub async fn iter_keyed_row_with_prefix(
2222 &self,
2223 pk_prefix: impl Row,
2224 sub_range: &(Bound<impl Row>, Bound<impl Row>),
2225 prefetch_options: PrefetchOptions,
2226 ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
2227 Ok(
2228 self.iter_with_prefix_inner::<false, Bytes>(pk_prefix, sub_range, prefetch_options)
2229 .await?.map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)),
2230 )
2231 }
2232
2233 pub async fn rev_iter_keyed_row_with_prefix(
2234 &self,
2235 pk_prefix: impl Row,
2236 sub_range: &(Bound<impl Row>, Bound<impl Row>),
2237 prefetch_options: PrefetchOptions,
2238 ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
2239 Ok(
2240 self.iter_with_prefix_inner::<true, Bytes>(pk_prefix, sub_range, prefetch_options)
2241 .await?.map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)),
2242 )
2243 }
2244
2245 pub async fn rev_iter_with_prefix(
2247 &self,
2248 pk_prefix: impl Row,
2249 sub_range: &(Bound<impl Row>, Bound<impl Row>),
2250 prefetch_options: PrefetchOptions,
2251 ) -> StreamExecutorResult<impl RowStream<'_>> {
2252 Ok(
2253 self.iter_with_prefix_inner::<true, ()>(pk_prefix, sub_range, prefetch_options)
2254 .await?.map_ok(|(_, row)| row),
2255 )
2256 }
2257
2258 async fn iter_with_prefix_inner<const REVERSE: bool, K: CopyFromSlice + FromVnodeBytes>(
2259 &self,
2260 pk_prefix: impl Row,
2261 sub_range: &(Bound<impl Row>, Bound<impl Row>),
2262 prefetch_options: PrefetchOptions,
2263 ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
2264 let prefix_serializer = self.pk_serde.prefix(pk_prefix.len());
2265 let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
2266
2267 let vnode = self.compute_prefix_vnode(&pk_prefix);
2271
2272 let pk_prefix_indices = &self.pk_indices[..pk_prefix.len()];
2274 if self.prefix_hint_len != 0 && !IS_REPLICATED {
2275 debug_assert_eq!(self.prefix_hint_len, pk_prefix.len());
2276 }
2277 let prefix_hint = {
2278 if should_calculate_prefix_hint(self.prefix_hint_len, pk_prefix.len(), true) {
2279 let encoded_prefix_len = self
2280 .pk_serde
2281 .deserialize_prefix_len(&encoded_prefix, self.prefix_hint_len)?;
2282
2283 Some(Bytes::copy_from_slice(
2284 &encoded_prefix[..encoded_prefix_len],
2285 ))
2286 } else {
2287 None
2288 }
2289 };
2290
2291 trace!(
2292 table_id = %self.table_id(),
2293 ?prefix_hint, ?pk_prefix,
2294 ?pk_prefix_indices,
2295 iter_direction = if REVERSE { "reverse" } else { "forward" },
2296 "storage_iter_with_prefix"
2297 );
2298
2299 let memcomparable_range =
2300 prefix_and_sub_range_to_memcomparable(&self.pk_serde, sub_range, pk_prefix);
2301
2302 Ok(if REVERSE {
2303 futures::future::Either::Left(
2304 self.row_store
2305 .rev_iter_kv(vnode, memcomparable_range, prefix_hint, prefetch_options)
2306 .await?,
2307 )
2308 } else {
2309 futures::future::Either::Right(
2310 self.row_store
2311 .iter_kv(vnode, memcomparable_range, prefix_hint, prefetch_options)
2312 .await?,
2313 )
2314 })
2315 }
2316
2317 async fn iter_kv_with_pk_range<'a, K: CopyFromSlice + FromVnodeBytes>(
2320 &'a self,
2321 pk_range: &(Bound<impl Row>, Bound<impl Row>),
2322 vnode: VirtualNode,
2326 prefetch_options: PrefetchOptions,
2327 ) -> StreamExecutorResult<impl PkRowStream<'a, K>> {
2328 let memcomparable_range = prefix_range_to_memcomparable(&self.pk_serde, pk_range);
2329
2330 self.row_store
2332 .iter_kv(vnode, memcomparable_range, None, prefetch_options)
2333 .await
2334 }
2335}
2336
2337fn deserialize_keyed_row_stream<'a, K: CopyFromSlice>(
2338 iter: impl StateStoreIter + 'a,
2339 deserializer: &'a impl ValueRowSerde,
2340) -> impl PkRowStream<'a, K> {
2341 iter.into_stream(move |(key, value)| {
2342 Ok((
2343 K::copy_from_slice(key.user_key.table_key.as_ref()),
2344 deserializer.deserialize(value).map(OwnedRow::new)?,
2345 ))
2346 })
2347 .map_err(Into::into)
2348}
2349
2350pub fn prefix_range_to_memcomparable(
2351 pk_serde: &OrderedRowSerde,
2352 range: &(Bound<impl Row>, Bound<impl Row>),
2353) -> (Bound<Bytes>, Bound<Bytes>) {
2354 (
2355 start_range_to_memcomparable(pk_serde, &range.0),
2356 end_range_to_memcomparable(pk_serde, &range.1, None),
2357 )
2358}
2359
2360fn prefix_and_sub_range_to_memcomparable(
2361 pk_serde: &OrderedRowSerde,
2362 sub_range: &(Bound<impl Row>, Bound<impl Row>),
2363 pk_prefix: impl Row,
2364) -> (Bound<Bytes>, Bound<Bytes>) {
2365 let (range_start, range_end) = sub_range;
2366 let prefix_serializer = pk_serde.prefix(pk_prefix.len());
2367 let serialized_pk_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
2368 let start_range = match range_start {
2369 Included(start_range) => Bound::Included(Either::Left((&pk_prefix).chain(start_range))),
2370 Excluded(start_range) => Bound::Excluded(Either::Left((&pk_prefix).chain(start_range))),
2371 Unbounded => Bound::Included(Either::Right(&pk_prefix)),
2372 };
2373 let end_range = match range_end {
2374 Included(end_range) => Bound::Included((&pk_prefix).chain(end_range)),
2375 Excluded(end_range) => Bound::Excluded((&pk_prefix).chain(end_range)),
2376 Unbounded => Unbounded,
2377 };
2378 (
2379 start_range_to_memcomparable(pk_serde, &start_range),
2380 end_range_to_memcomparable(pk_serde, &end_range, Some(serialized_pk_prefix)),
2381 )
2382}
2383
2384fn start_range_to_memcomparable<R: Row>(
2385 pk_serde: &OrderedRowSerde,
2386 bound: &Bound<R>,
2387) -> Bound<Bytes> {
2388 let serialize_pk_prefix = |pk_prefix: &R| {
2389 let prefix_serializer = pk_serde.prefix(pk_prefix.len());
2390 serialize_pk(pk_prefix, &prefix_serializer)
2391 };
2392 match bound {
2393 Unbounded => Unbounded,
2394 Included(r) => {
2395 let serialized = serialize_pk_prefix(r);
2396
2397 Included(serialized)
2398 }
2399 Excluded(r) => {
2400 let serialized = serialize_pk_prefix(r);
2401
2402 start_bound_of_excluded_prefix(&serialized)
2403 }
2404 }
2405}
2406
2407fn end_range_to_memcomparable<R: Row>(
2408 pk_serde: &OrderedRowSerde,
2409 bound: &Bound<R>,
2410 serialized_pk_prefix: Option<Bytes>,
2411) -> Bound<Bytes> {
2412 let serialize_pk_prefix = |pk_prefix: &R| {
2413 let prefix_serializer = pk_serde.prefix(pk_prefix.len());
2414 serialize_pk(pk_prefix, &prefix_serializer)
2415 };
2416 match bound {
2417 Unbounded => match serialized_pk_prefix {
2418 Some(serialized_pk_prefix) => end_bound_of_prefix(&serialized_pk_prefix),
2419 None => Unbounded,
2420 },
2421 Included(r) => {
2422 let serialized = serialize_pk_prefix(r);
2423 end_bound_of_prefix(&serialized)
2425 }
2426 Excluded(r) => {
2427 let serialized = serialize_pk_prefix(r);
2428 Excluded(serialized)
2429 }
2430 }
2431}
2432
2433fn fill_non_output_indices(
2434 i2o_mapping: &ColIndexMapping,
2435 data_types: &[DataType],
2436 chunk: StreamChunk,
2437) -> StreamChunk {
2438 let cardinality = chunk.cardinality();
2439 let (ops, columns, vis) = chunk.into_inner();
2440 let mut full_columns = Vec::with_capacity(data_types.len());
2441 for (i, data_type) in data_types.iter().enumerate() {
2442 if let Some(j) = i2o_mapping.try_map(i) {
2443 full_columns.push(columns[j].clone());
2444 } else {
2445 let mut column_builder = ArrayImplBuilder::with_type(cardinality, data_type.clone());
2446 column_builder.append_n_null(cardinality);
2447 let column: ArrayRef = column_builder.finish().into();
2448 full_columns.push(column)
2449 }
2450 }
2451 let data_chunk = DataChunk::new(full_columns, vis);
2452 StreamChunk::from_parts(ops, data_chunk)
2453}
2454
2455#[cfg(test)]
2456mod tests {
2457 use std::fmt::Debug;
2458
2459 use expect_test::{Expect, expect};
2460
2461 use super::*;
2462
2463 fn check(actual: impl Debug, expect: Expect) {
2464 let actual = format!("{:#?}", actual);
2465 expect.assert_eq(&actual);
2466 }
2467
2468 #[test]
2469 fn test_fill_non_output_indices() {
2470 let data_types = vec![DataType::Int32, DataType::Int32, DataType::Int32];
2471 let replicated_chunk = [OwnedRow::new(vec![
2472 Some(222_i32.into()),
2473 Some(2_i32.into()),
2474 ])];
2475 let replicated_chunk = StreamChunk::from_parts(
2476 vec![Op::Insert],
2477 DataChunk::from_rows(&replicated_chunk, &[DataType::Int32, DataType::Int32]),
2478 );
2479 let i2o_mapping = ColIndexMapping::new(vec![Some(1), None, Some(0)], 2);
2480 let filled_chunk = fill_non_output_indices(&i2o_mapping, &data_types, replicated_chunk);
2481 check(
2482 filled_chunk,
2483 expect![[r#"
2484 StreamChunk { cardinality: 1, capacity: 1, data:
2485 +---+---+---+-----+
2486 | + | 2 | | 222 |
2487 +---+---+---+-----+
2488 }"#]],
2489 );
2490 }
2491}