1use std::collections::{BTreeMap, HashMap};
16use std::marker::PhantomData;
17use std::ops::Bound;
18use std::ops::Bound::*;
19use std::sync::Arc;
20use std::time::Instant;
21
22use anyhow::anyhow;
23use bytes::Bytes;
24use either::Either;
25use foyer::Hint;
26use futures::future::{ready, try_join_all};
27use futures::stream::BoxStream;
28use futures::{Stream, StreamExt, TryStreamExt, pin_mut};
29use itertools::Itertools;
30use risingwave_common::array::stream_record::Record;
31use risingwave_common::array::{ArrayImplBuilder, ArrayRef, DataChunk, Op, StreamChunk};
32use risingwave_common::bitmap::Bitmap;
33use risingwave_common::catalog::{
34 ColumnDesc, ColumnId, TableId, TableOption, get_dist_key_in_pk_indices,
35};
36use risingwave_common::config::StreamingConfig;
37use risingwave_common::hash::{VirtualNode, VnodeBitmapExt, VnodeCountCompat};
38use risingwave_common::id::FragmentId;
39use risingwave_common::row::{self, OwnedRow, Row, RowExt};
40use risingwave_common::types::{DataType, ScalarImpl};
41use risingwave_common::util::column_index_mapping::ColIndexMapping;
42use risingwave_common::util::epoch::EpochPair;
43use risingwave_common::util::row_serde::OrderedRowSerde;
44use risingwave_common::util::sort_util::{OrderType, cmp_datum};
45use risingwave_common::util::value_encoding::BasicSerde;
46use risingwave_hummock_sdk::HummockReadEpoch;
47use risingwave_hummock_sdk::key::{
48 CopyFromSlice, TableKey, end_bound_of_prefix, next_key, prefix_slice_with_vnode,
49 prefixed_range_with_vnode, start_bound_of_excluded_prefix,
50};
51use risingwave_hummock_sdk::table_watermark::{
52 VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
53};
54use risingwave_pb::catalog::Table;
55use risingwave_pb::plan_common::StorageTableDesc;
56use risingwave_storage::StateStore;
57use risingwave_storage::error::{ErrorKind, StorageError, StorageResult};
58use risingwave_storage::hummock::CachePolicy;
59use risingwave_storage::mem_table::MemTableError;
60use risingwave_storage::row_serde::find_columns_by_ids;
61use risingwave_storage::row_serde::row_serde_util::{
62 deserialize_pk_with_vnode, serialize_pk, serialize_pk_with_vnode, serialize_row,
63};
64use risingwave_storage::row_serde::value_serde::ValueRowSerde;
65use risingwave_storage::store::*;
66use risingwave_storage::table::{KeyedRow, TableDistribution, should_calculate_prefix_hint};
67use thiserror_ext::AsReport;
68use tracing::{Instrument, trace};
69
70use crate::cache::keyed_cache_may_stale;
71use crate::executor::monitor::streaming_stats::StateTableMetrics;
72use crate::executor::{StreamExecutorError, StreamExecutorResult};
73
74macro_rules! insane_mode_discard_point {
77 () => {{
78 use rand::Rng;
79 if crate::consistency::insane() && rand::rng().random_bool(0.3) {
80 return;
81 }
82 }};
83}
84
85struct VnodeStatistics {
89 min_key: Option<Bytes>,
90 max_key: Option<Bytes>,
91}
92
93impl VnodeStatistics {
94 fn new() -> Self {
95 Self {
96 min_key: None,
97 max_key: None,
98 }
99 }
100
101 fn update_with_key(&mut self, key: &Bytes) {
102 if let Some(min) = &self.min_key {
103 if key < min {
104 self.min_key = Some(key.clone());
105 }
106 } else {
107 self.min_key = Some(key.clone());
108 }
109
110 if let Some(max) = &self.max_key {
111 if key > max {
112 self.max_key = Some(key.clone());
113 }
114 } else {
115 self.max_key = Some(key.clone());
116 }
117 }
118
119 fn can_prune(&self, key: &Bytes) -> bool {
120 if let Some(min) = &self.min_key
121 && key < min
122 {
123 return true;
124 }
125 if let Some(max) = &self.max_key
126 && key > max
127 {
128 return true;
129 }
130 false
131 }
132
133 fn can_prune_range(&self, start: &Bound<Bytes>, end: &Bound<Bytes>) -> bool {
134 if let Some(max) = &self.max_key {
136 match start {
137 Included(s) if s > max => return true,
138 Excluded(s) if s >= max => return true,
139 _ => {}
140 }
141 }
142 if let Some(min) = &self.min_key {
143 match end {
144 Included(e) if e < min => return true,
145 Excluded(e) if e <= min => return true,
146 _ => {}
147 }
148 }
149 false
150 }
151
152 fn pruned_key_range(
153 &self,
154 start: &Bound<Bytes>,
155 end: &Bound<Bytes>,
156 ) -> Option<(Bound<Bytes>, Bound<Bytes>)> {
157 if self.can_prune_range(start, end) {
158 return None;
159 }
160 let new_start = if let Some(min) = &self.min_key {
161 match start {
162 Included(s) if s <= min => Included(min.clone()),
163 Excluded(s) if s < min => Included(min.clone()),
164 _ => start.clone(),
165 }
166 } else {
167 start.clone()
168 };
169
170 let new_end = if let Some(max) = &self.max_key {
171 match end {
172 Included(e) if e >= max => Included(max.clone()),
173 Excluded(e) if e > max => Included(max.clone()),
174 _ => end.clone(),
175 }
176 } else {
177 end.clone()
178 };
179
180 Some((new_start, new_end))
181 }
182}
183
184pub struct StateTableInner<S, SD = BasicSerde, const IS_REPLICATED: bool = false>
187where
188 S: StateStore,
189 SD: ValueRowSerde,
190{
191 table_id: TableId,
193
194 row_store: StateTableRowStore<S::Local, SD>,
196
197 store: S,
199
200 epoch: Option<EpochPair>,
202
203 pk_serde: OrderedRowSerde,
205
206 pk_indices: Vec<usize>,
210
211 distribution: TableDistribution,
217
218 prefix_hint_len: usize,
219
220 value_indices: Option<Vec<usize>>,
221
222 pub clean_watermark_index: Option<usize>,
224 pending_watermark: Option<ScalarImpl>,
226 committed_watermark: Option<ScalarImpl>,
228 watermark_serde: Option<(OrderedRowSerde, WatermarkSerdeType)>,
230
231 data_types: Vec<DataType>,
234
235 i2o_mapping: ColIndexMapping,
241
242 pub output_indices: Vec<usize>,
247
248 op_consistency_level: StateTableOpConsistencyLevel,
249
250 on_post_commit: bool,
253}
254
255pub type StateTable<S> = StateTableInner<S, BasicSerde>;
257pub type ReplicatedStateTable<S, SD> = StateTableInner<S, SD, true>;
260
261impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
263where
264 S: StateStore,
265 SD: ValueRowSerde,
266{
267 pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
270 self.row_store
271 .init(epoch, self.distribution.vnodes())
272 .await?;
273 assert_eq!(None, self.epoch.replace(epoch), "should not init for twice");
274 Ok(())
275 }
276
277 pub async fn try_wait_committed_epoch(&self, prev_epoch: u64) -> StorageResult<()> {
278 self.store
279 .try_wait_epoch(
280 HummockReadEpoch::Committed(prev_epoch),
281 TryWaitEpochOptions {
282 table_id: self.table_id,
283 },
284 )
285 .await
286 }
287
288 pub fn state_store(&self) -> &S {
289 &self.store
290 }
291}
292
293fn consistent_old_value_op(
294 row_serde: Arc<impl ValueRowSerde>,
295 is_log_store: bool,
296) -> OpConsistencyLevel {
297 OpConsistencyLevel::ConsistentOldValue {
298 check_old_value: Arc::new(move |first: &Bytes, second: &Bytes| {
299 if first == second {
300 return true;
301 }
302 let first = match row_serde.deserialize(first) {
303 Ok(rows) => rows,
304 Err(e) => {
305 error!(error = %e.as_report(), value = ?first, "fail to deserialize serialized value");
306 return false;
307 }
308 };
309 let second = match row_serde.deserialize(second) {
310 Ok(rows) => rows,
311 Err(e) => {
312 error!(error = %e.as_report(), value = ?second, "fail to deserialize serialized value");
313 return false;
314 }
315 };
316 if first != second {
317 error!(first = ?first, second = ?second, "sanity check fail");
318 false
319 } else {
320 true
321 }
322 }),
323 is_log_store,
324 }
325}
326
327macro_rules! dispatch_value_indices {
328 ($value_indices:expr, [$($row_var_name:ident),+], $body:expr) => {
329 if let Some(value_indices) = $value_indices {
330 $(
331 let $row_var_name = $row_var_name.project(value_indices);
332 )+
333 $body
334 } else {
335 $body
336 }
337 };
338}
339
340struct StateTableRowStore<LS: LocalStateStore, SD: ValueRowSerde> {
345 state_store: LS,
346 all_rows: Option<HashMap<VirtualNode, BTreeMap<Bytes, OwnedRow>>>,
347
348 table_id: TableId,
349 row_serde: Arc<SD>,
350 pk_serde: OrderedRowSerde,
352
353 vnode_stats: Option<HashMap<VirtualNode, VnodeStatistics>>,
355 enable_state_table_vnode_stats_pruning: bool,
359 pub metrics: Option<StateTableMetrics>,
361}
362
363impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
364 async fn may_load_vnode_stats(&mut self, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
365 if self.vnode_stats.is_none() {
366 return Ok(());
367 }
368
369 assert!(self.all_rows.is_none());
371
372 let start_time = Instant::now();
373 let mut stats_map = HashMap::new();
374
375 for vnode in vnode_bitmap.iter_vnodes() {
377 let mut stats = VnodeStatistics::new();
378
379 let memcomparable_range_with_vnode = prefixed_range_with_vnode::<Bytes>(.., vnode);
381 let read_options = ReadOptions {
382 cache_policy: CachePolicy::Fill(Hint::Low),
383 ..Default::default()
384 };
385
386 let mut iter = self
387 .state_store
388 .iter(memcomparable_range_with_vnode.clone(), read_options.clone())
389 .await?;
390 if let Some(item) = iter.try_next().await? {
391 let (key_vnode, key_without_vnode) = item.0.user_key.table_key.split_vnode();
392 assert_eq!(vnode, key_vnode);
393 stats.min_key = Some(Bytes::copy_from_slice(key_without_vnode));
394 }
395
396 let mut rev_iter = self
398 .state_store
399 .rev_iter(memcomparable_range_with_vnode, read_options)
400 .await?;
401 if let Some(item) = rev_iter.try_next().await? {
402 let (key_vnode, key_without_vnode) = item.0.user_key.table_key.split_vnode();
403 assert_eq!(vnode, key_vnode);
404 stats.max_key = Some(Bytes::copy_from_slice(key_without_vnode));
405 }
406
407 stats_map.insert(vnode, stats);
408 }
409
410 self.vnode_stats = Some(stats_map);
411
412 if !cfg!(debug_assertions) {
414 info!(
415 table_id = %self.table_id,
416 vnode_count = vnode_bitmap.count_ones(),
417 duration = ?start_time.elapsed(),
418 "finished initializing vnode statistics"
419 );
420 }
421
422 Ok(())
423 }
424
425 async fn may_reload_all_rows(&mut self, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
426 if let Some(rows) = &mut self.all_rows {
427 rows.clear();
428 let start_time = Instant::now();
429 *rows = try_join_all(vnode_bitmap.iter_vnodes().map(|vnode| {
430 let state_store = &self.state_store;
431 let row_serde = &self.row_serde;
432 async move {
433 let mut rows = BTreeMap::new();
434 let memcomparable_range_with_vnode =
435 prefixed_range_with_vnode::<Bytes>(.., vnode);
436 let stream = deserialize_keyed_row_stream::<Bytes>(
438 state_store
439 .iter(
440 memcomparable_range_with_vnode,
441 ReadOptions {
442 prefix_hint: None,
443 prefetch_options: Default::default(),
444 cache_policy: Default::default(),
445 },
446 )
447 .await?,
448 &**row_serde,
449 );
450 pin_mut!(stream);
451 while let Some((encoded_key, row)) = stream.try_next().await? {
452 let key = TableKey(encoded_key);
453 let (iter_vnode, key) = key.split_vnode_bytes();
454 assert_eq!(vnode, iter_vnode);
455 rows.try_insert(key, row).expect("non-duplicated");
456 }
457 Ok((vnode, rows)) as StreamExecutorResult<_>
458 }
459 }))
460 .await?
461 .into_iter()
462 .collect();
463 if !cfg!(debug_assertions) {
465 info!(table_id = %self.table_id, vnode_count = vnode_bitmap.count_ones(), duration = ?start_time.elapsed(),"finished reloading all rows");
466 }
467 }
468 Ok(())
469 }
470
471 async fn init(&mut self, epoch: EpochPair, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
472 self.state_store.init(InitOptions::new(epoch)).await?;
473 self.may_reload_all_rows(vnode_bitmap).await?;
474 self.may_load_vnode_stats(vnode_bitmap).await
475 }
476
477 async fn update_vnode_bitmap(
478 &mut self,
479 vnodes: Arc<Bitmap>,
480 ) -> StreamExecutorResult<Arc<Bitmap>> {
481 let prev_vnodes = self.state_store.update_vnode_bitmap(vnodes.clone()).await?;
482 self.may_reload_all_rows(&vnodes).await?;
483 self.may_load_vnode_stats(&vnodes).await?;
484
485 Ok(prev_vnodes)
486 }
487
488 async fn try_flush(&mut self) -> StreamExecutorResult<()> {
489 self.state_store.try_flush().await?;
490 Ok(())
491 }
492
493 async fn seal_current_epoch(
494 &mut self,
495 next_epoch: u64,
496 table_watermarks: Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)>,
497 switch_consistent_op: Option<StateTableOpConsistencyLevel>,
498 ) -> StreamExecutorResult<()> {
499 if let Some((direction, watermarks, serde_type)) = &table_watermarks
500 && let Some(rows) = &mut self.all_rows
501 {
502 match serde_type {
503 WatermarkSerdeType::PkPrefix => {
504 for vnode_watermark in watermarks {
505 match direction {
506 WatermarkDirection::Ascending => {
507 for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
508 let rows = rows.get_mut(&vnode).expect("covered vnode");
509 *rows = rows.split_off(vnode_watermark.watermark());
511 }
512 }
513 WatermarkDirection::Descending => {
514 let split_off_key = next_key(vnode_watermark.watermark());
516 for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
517 let rows = rows.get_mut(&vnode).expect("covered vnode");
518 rows.split_off(split_off_key.as_slice());
521 }
522 }
523 }
524 }
525 }
526 WatermarkSerdeType::NonPkPrefix => {
527 warn!(table_id = %self.table_id, "table enabled preloading rows got disabled by written non pk prefix watermark");
528 self.all_rows = None;
529 }
530 WatermarkSerdeType::Value => {
531 warn!(table_id = %self.table_id, "table enabled preloading rows got disabled by written value watermark");
532 self.all_rows = None;
533 }
534 }
535 }
536 self.state_store
537 .flush()
538 .instrument(tracing::info_span!("state_table_flush"))
539 .await?;
540 let switch_op_consistency_level =
541 switch_consistent_op.map(|new_consistency_level| match new_consistency_level {
542 StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
543 StateTableOpConsistencyLevel::ConsistentOldValue => {
544 consistent_old_value_op(self.row_serde.clone(), false)
545 }
546 StateTableOpConsistencyLevel::LogStoreEnabled => {
547 consistent_old_value_op(self.row_serde.clone(), true)
548 }
549 });
550 self.state_store.seal_current_epoch(
551 next_epoch,
552 SealCurrentEpochOptions {
553 table_watermarks,
554 switch_op_consistency_level,
555 },
556 );
557 Ok(())
558 }
559}
560
561#[derive(Eq, PartialEq, Copy, Clone, Debug)]
562pub enum StateTableOpConsistencyLevel {
563 Inconsistent,
565 ConsistentOldValue,
569 LogStoreEnabled,
572}
573
574pub struct StateTableBuilder<S, SD, const IS_REPLICATED: bool, PreloadAllRow> {
575 table_id: TableId,
577 table_name_for_debug: String,
578 table_columns: Vec<ColumnDesc>,
579 order_types: Vec<OrderType>,
580 pk_indices: Vec<usize>,
581 dist_key_in_pk_indices: Vec<usize>,
582 vnode_col_idx_in_pk: Option<usize>,
583 expected_vnode_count: usize,
584 value_indices: Vec<usize>,
585 prefix_hint_len: usize,
586 retention_seconds: Option<u32>,
587 versioned: bool,
588 fragment_id: FragmentId,
589 clean_watermark_index: Option<usize>,
590
591 store: S,
593 vnodes: Option<Arc<Bitmap>>,
594 op_consistency_level: Option<StateTableOpConsistencyLevel>,
595 output_column_ids: Option<Vec<ColumnId>>,
596 preload_all_rows: PreloadAllRow,
597 enable_vnode_key_stats: Option<bool>,
598 enable_state_table_vnode_stats_pruning: bool,
601 metrics: Option<StateTableMetrics>,
602
603 _serde: PhantomData<SD>,
604}
605
606impl<S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool>
607 StateTableBuilder<S, SD, IS_REPLICATED, ()>
608{
609 fn with_preload_all_rows(
610 self,
611 preload_all_rows: bool,
612 ) -> StateTableBuilder<S, SD, IS_REPLICATED, bool> {
613 StateTableBuilder {
614 table_id: self.table_id,
615 table_name_for_debug: self.table_name_for_debug,
616 table_columns: self.table_columns,
617 order_types: self.order_types,
618 pk_indices: self.pk_indices,
619 dist_key_in_pk_indices: self.dist_key_in_pk_indices,
620 vnode_col_idx_in_pk: self.vnode_col_idx_in_pk,
621 expected_vnode_count: self.expected_vnode_count,
622 value_indices: self.value_indices,
623 prefix_hint_len: self.prefix_hint_len,
624 retention_seconds: self.retention_seconds,
625 versioned: self.versioned,
626 fragment_id: self.fragment_id,
627 clean_watermark_index: self.clean_watermark_index,
628 store: self.store,
629 vnodes: self.vnodes,
630 op_consistency_level: self.op_consistency_level,
631 output_column_ids: self.output_column_ids,
632 preload_all_rows,
633 enable_vnode_key_stats: self.enable_vnode_key_stats,
634 enable_state_table_vnode_stats_pruning: self.enable_state_table_vnode_stats_pruning,
635 metrics: self.metrics,
636 _serde: Default::default(),
637 }
638 }
639
640 pub fn enable_preload_all_rows_by_config(
641 self,
642 config: &StreamingConfig,
643 ) -> StateTableBuilder<S, SD, IS_REPLICATED, bool> {
644 let developer = &config.developer;
645 let preload_all_rows = if developer.default_enable_mem_preload_state_table {
646 !developer
647 .mem_preload_state_table_ids_blacklist
648 .contains(&self.table_id.as_raw_id())
649 } else {
650 developer
651 .mem_preload_state_table_ids_whitelist
652 .contains(&self.table_id.as_raw_id())
653 };
654 self.with_preload_all_rows(preload_all_rows)
655 }
656
657 pub fn forbid_preload_all_rows(self) -> StateTableBuilder<S, SD, IS_REPLICATED, bool> {
658 self.with_preload_all_rows(false)
659 }
660}
661
662impl<S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool, PreloadAllRow>
663 StateTableBuilder<S, SD, IS_REPLICATED, PreloadAllRow>
664{
665 pub fn with_op_consistency_level(
666 mut self,
667 op_consistency_level: StateTableOpConsistencyLevel,
668 ) -> Self {
669 self.op_consistency_level = Some(op_consistency_level);
670 self
671 }
672
673 pub fn enable_vnode_key_stats(mut self, enable: bool, config: &StreamingConfig) -> Self {
674 self.enable_vnode_key_stats = Some(enable);
675 self.enable_state_table_vnode_stats_pruning =
676 enable && config.developer.enable_state_table_vnode_stats_pruning;
677 self
678 }
679
680 pub fn with_metrics(mut self, metrics: StateTableMetrics) -> Self {
681 self.metrics = Some(metrics);
682 self
683 }
684}
685
686impl<S: StateStore, SD: ValueRowSerde, PreloadAllRow>
687 StateTableBuilder<S, SD, true, PreloadAllRow>
688{
689 pub fn with_output_column_ids(mut self, output_column_ids: Vec<ColumnId>) -> Self {
690 self.output_column_ids = Some(output_column_ids);
691 self
692 }
693}
694
695impl<S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool>
696 StateTableBuilder<S, SD, IS_REPLICATED, bool>
697{
698 pub async fn build(self) -> StateTableInner<S, SD, IS_REPLICATED> {
699 let mut preload_all_rows = self.preload_all_rows;
700 if preload_all_rows
701 && let Err(e) =
702 risingwave_common::license::Feature::StateTableMemoryPreload.check_available()
703 {
704 warn!(table_id=%self.table_id, e=%e.as_report(), "table configured to preload rows to memory but disabled by license");
705 preload_all_rows = false;
706 }
707
708 let should_enable_vnode_key_stats = if preload_all_rows
709 && let Some(enable_vnode_key_stats) = self.enable_vnode_key_stats
710 && enable_vnode_key_stats
711 {
712 false
713 } else {
714 self.enable_vnode_key_stats.unwrap_or(false)
715 };
716 self.build_inner(preload_all_rows, should_enable_vnode_key_stats)
717 .await
718 }
719}
720
721impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
726where
727 S: StateStore,
728 SD: ValueRowSerde,
729{
730 #[cfg(any(test, feature = "test"))]
734 pub async fn from_table_catalog(
735 table_catalog: &Table,
736 store: S,
737 vnodes: Option<Arc<Bitmap>>,
738 ) -> Self {
739 StateTableBuilder::new(table_catalog, store, vnodes)
740 .forbid_preload_all_rows()
741 .build()
742 .await
743 }
744
745 pub async fn from_table_catalog_inconsistent_op(
747 table_catalog: &Table,
748 store: S,
749 vnodes: Option<Arc<Bitmap>>,
750 ) -> Self {
751 StateTableBuilder::new(table_catalog, store, vnodes)
752 .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
753 .forbid_preload_all_rows()
754 .build()
755 .await
756 }
757}
758
759impl<S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool>
760 StateTableBuilder<S, SD, IS_REPLICATED, ()>
761{
762 pub fn new(table_catalog: &Table, store: S, vnodes: Option<Arc<Bitmap>>) -> Self {
763 let table_id = table_catalog.id;
764 let table_columns: Vec<ColumnDesc> = table_catalog
765 .columns
766 .iter()
767 .map(|col| col.column_desc.as_ref().unwrap().into())
768 .collect();
769 let order_types: Vec<OrderType> = table_catalog
770 .pk
771 .iter()
772 .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
773 .collect();
774 let dist_key_indices: Vec<usize> = table_catalog
775 .distribution_key
776 .iter()
777 .map(|dist_index| *dist_index as usize)
778 .collect();
779
780 let pk_indices = table_catalog
781 .pk
782 .iter()
783 .map(|col_order| col_order.column_index as usize)
784 .collect_vec();
785
786 let dist_key_in_pk_indices = if table_catalog.get_dist_key_in_pk().is_empty() {
788 get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices).unwrap()
789 } else {
790 table_catalog
791 .get_dist_key_in_pk()
792 .iter()
793 .map(|idx| *idx as usize)
794 .collect()
795 };
796
797 let vnode_col_idx_in_pk = table_catalog.vnode_col_index.as_ref().and_then(|idx| {
798 let vnode_col_idx = *idx as usize;
799 pk_indices.iter().position(|&i| vnode_col_idx == i)
800 });
801 let value_indices = table_catalog
802 .value_indices
803 .iter()
804 .map(|val| *val as usize)
805 .collect_vec();
806 let clean_watermark_indices = table_catalog.get_clean_watermark_column_indices();
807 if clean_watermark_indices.len() > 1 {
808 unimplemented!("multiple clean watermark columns are not supported yet")
809 }
810 let clean_watermark_index = clean_watermark_indices.first().map(|&i| i as usize);
811
812 Self {
813 table_id,
814 table_name_for_debug: table_catalog.name.clone(),
815 table_columns,
816 order_types,
817 pk_indices,
818 dist_key_in_pk_indices,
819 vnode_col_idx_in_pk,
820 expected_vnode_count: table_catalog.vnode_count(),
821 value_indices,
822 prefix_hint_len: table_catalog.read_prefix_len_hint as usize,
823 retention_seconds: table_catalog.retention_seconds,
824 versioned: table_catalog.version.is_some(),
825 fragment_id: table_catalog.fragment_id,
826 clean_watermark_index,
827 store,
828 vnodes,
829 op_consistency_level: None,
830 output_column_ids: None,
831 preload_all_rows: (),
832 enable_vnode_key_stats: None,
833 enable_state_table_vnode_stats_pruning: false,
834 metrics: None,
835 _serde: Default::default(),
836 }
837 }
838}
839
840impl<S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool>
841 StateTableBuilder<S, SD, IS_REPLICATED, bool>
842{
843 async fn build_inner(
844 self,
845 preload_all_rows: bool,
846 should_enable_vnode_key_stats: bool,
847 ) -> StateTableInner<S, SD, IS_REPLICATED> {
848 let table_id = self.table_id;
849 let table_columns = self.table_columns;
850 let order_types = self.order_types;
851 let pk_indices = self.pk_indices;
852 let dist_key_in_pk_indices = self.dist_key_in_pk_indices;
853 let vnode_col_idx_in_pk = self.vnode_col_idx_in_pk;
854 let prefix_hint_len = self.prefix_hint_len;
855 let metrics = self.metrics;
856
857 let op_consistency_level = self
858 .op_consistency_level
859 .unwrap_or(StateTableOpConsistencyLevel::ConsistentOldValue);
860
861 let output_column_ids = self.output_column_ids.unwrap_or_default();
862
863 let data_types: Vec<DataType> = table_columns
864 .iter()
865 .map(|col| col.data_type.clone())
866 .collect();
867
868 if IS_REPLICATED && prefix_hint_len > 0 {
872 assert!(
873 dist_key_in_pk_indices.iter().all(|&d| d < prefix_hint_len),
874 "replicated state table: distribution key indices {:?} must all be covered by \
875 prefix_hint_len {}",
876 dist_key_in_pk_indices,
877 prefix_hint_len,
878 );
879 }
880
881 let distribution =
882 TableDistribution::new(self.vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk);
883 assert_eq!(
884 distribution.vnode_count(),
885 self.expected_vnode_count,
886 "vnode count mismatch, scanning table {} under wrong distribution?",
887 self.table_name_for_debug,
888 );
889
890 let pk_data_types = pk_indices
891 .iter()
892 .map(|i| table_columns[*i].data_type.clone())
893 .collect();
894 let pk_serde = OrderedRowSerde::new(pk_data_types, order_types);
895
896 let input_value_indices = self.value_indices;
897
898 let no_shuffle_value_indices = (0..table_columns.len()).collect_vec();
899
900 let value_indices = match input_value_indices.len() == table_columns.len()
902 && input_value_indices == no_shuffle_value_indices
903 {
904 true => None,
905 false => Some(input_value_indices.clone()),
906 };
907
908 let row_serde = Arc::new(SD::new(
909 Arc::from_iter(input_value_indices.iter().copied()),
910 Arc::from(table_columns.clone().into_boxed_slice()),
911 ));
912
913 let state_table_op_consistency_level = op_consistency_level;
914 let op_consistency_level = match state_table_op_consistency_level {
915 StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
916 StateTableOpConsistencyLevel::ConsistentOldValue => {
917 consistent_old_value_op(row_serde.clone(), false)
918 }
919 StateTableOpConsistencyLevel::LogStoreEnabled => {
920 consistent_old_value_op(row_serde.clone(), true)
921 }
922 };
923
924 let table_option = TableOption::new(self.retention_seconds);
925 let new_local_options = if IS_REPLICATED {
926 NewLocalOptions::new_replicated(
927 table_id,
928 self.fragment_id,
929 op_consistency_level,
930 table_option,
931 distribution.vnodes().clone(),
932 )
933 } else {
934 NewLocalOptions::new(
935 table_id,
936 self.fragment_id,
937 op_consistency_level,
938 table_option,
939 distribution.vnodes().clone(),
940 true,
941 )
942 };
943 let local_state_store = self.store.new_local(new_local_options).await;
944
945 assert_eq!(self.versioned, row_serde.kind().is_column_aware());
951
952 let output_column_ids_to_input_idx = output_column_ids
954 .iter()
955 .enumerate()
956 .map(|(pos, id)| (*id, pos))
957 .collect::<HashMap<_, _>>();
958
959 let columns = table_columns;
960
961 let mut i2o_mapping = vec![None; columns.len()];
965 for (i, column) in columns.iter().enumerate() {
966 if let Some(pos) = output_column_ids_to_input_idx.get(&column.column_id) {
967 i2o_mapping[i] = Some(*pos);
968 }
969 }
970 let i2o_mapping = ColIndexMapping::new(i2o_mapping, output_column_ids.len());
972
973 let (_, output_indices) = find_columns_by_ids(&columns[..], &output_column_ids);
975
976 if IS_REPLICATED {
979 assert!(
980 pk_indices
981 .iter()
982 .all(|&pk_idx| output_indices.contains(&pk_idx)),
983 "all pk columns must be included in output_column_ids for replicated state table"
984 );
985 }
986
987 let clean_watermark_index = self.clean_watermark_index;
988 let watermark_serde = clean_watermark_index.map(|idx| {
989 let pk_idx = pk_indices.iter().position(|&i| i == idx);
990 let (watermark_serde, watermark_serde_type) = match pk_idx {
991 Some(0) => (pk_serde.index(0).into_owned(), WatermarkSerdeType::PkPrefix),
992 Some(pk_idx) => (
993 pk_serde.index(pk_idx).into_owned(),
994 WatermarkSerdeType::NonPkPrefix,
995 ),
996 None => (
997 OrderedRowSerde::new(
998 vec![data_types[idx].clone()],
999 vec![OrderType::ascending()],
1000 ),
1001 WatermarkSerdeType::Value,
1002 ),
1003 };
1004 (watermark_serde, watermark_serde_type)
1005 });
1006
1007 let committed_watermark = if let Some((deser, _)) = watermark_serde.as_ref() {
1009 distribution
1010 .vnodes()
1011 .iter_vnodes()
1012 .filter_map(|vnode| {
1013 let bytes = local_state_store.get_table_watermark(vnode)?;
1014 let datum = deser.deserialize(&bytes).ok().and_then(|row| {
1015 assert!(row.len() == 1);
1016 row[0].clone()
1017 });
1018 if datum.is_none() {
1019 tracing::error!(
1020 ?vnode,
1021 watermark = ?bytes,
1022 "Failed to deserialize persisted watermark from state store.",
1023 );
1024 }
1025 datum
1026 })
1027 .max_by(|a, b| cmp_datum(Some(a), Some(b), OrderType::ascending()))
1028 } else {
1029 None
1030 };
1031
1032 StateTableInner {
1033 table_id,
1034 row_store: StateTableRowStore {
1035 all_rows: preload_all_rows.then(HashMap::new),
1036 state_store: local_state_store,
1037 row_serde,
1038 pk_serde: pk_serde.clone(),
1039 table_id,
1040 vnode_stats: should_enable_vnode_key_stats.then(HashMap::new),
1042 enable_state_table_vnode_stats_pruning: self.enable_state_table_vnode_stats_pruning,
1043 metrics,
1044 },
1045 store: self.store,
1046 epoch: None,
1047 pk_serde,
1048 pk_indices,
1049 distribution,
1050 prefix_hint_len,
1051 value_indices,
1052 pending_watermark: None,
1053 committed_watermark,
1054 watermark_serde,
1055 data_types,
1056 output_indices,
1057 i2o_mapping,
1058 op_consistency_level: state_table_op_consistency_level,
1059 clean_watermark_index,
1060 on_post_commit: false,
1061 }
1062 }
1063}
1064
1065impl<S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool>
1066 StateTableBuilder<S, SD, IS_REPLICATED, ()>
1067{
1068 pub fn new_from_storage_table_desc(
1069 table_desc: &StorageTableDesc,
1070 store: S,
1071 vnodes: Option<Arc<Bitmap>>,
1072 fragment_id: FragmentId,
1073 ) -> Self {
1074 let table_id = table_desc.table_id;
1075 let table_columns: Vec<ColumnDesc> =
1076 table_desc.columns.iter().map(ColumnDesc::from).collect();
1077 let order_types: Vec<OrderType> = table_desc
1078 .pk
1079 .iter()
1080 .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
1081 .collect();
1082 let pk_indices = table_desc
1083 .pk
1084 .iter()
1085 .map(|col_order| col_order.column_index as usize)
1086 .collect_vec();
1087 let dist_key_in_pk_indices = table_desc
1088 .dist_key_in_pk_indices
1089 .iter()
1090 .map(|&idx| idx as usize)
1091 .collect();
1092 let vnode_col_idx_in_pk = table_desc.vnode_col_idx_in_pk.map(|k| k as usize);
1095 let raw_value_indices = table_desc
1096 .value_indices
1097 .iter()
1098 .map(|val| *val as usize)
1099 .collect_vec();
1100
1101 Self {
1102 table_id,
1103 table_name_for_debug: table_id.to_string(),
1104 table_columns,
1105 order_types,
1106 pk_indices,
1107 dist_key_in_pk_indices,
1108 vnode_col_idx_in_pk,
1109 expected_vnode_count: table_desc.vnode_count(),
1110 value_indices: raw_value_indices,
1111 prefix_hint_len: table_desc.read_prefix_len_hint as usize,
1112 retention_seconds: table_desc.retention_seconds,
1113 versioned: table_desc.versioned,
1114 fragment_id,
1115 clean_watermark_index: None,
1116 store,
1117 vnodes,
1118 op_consistency_level: None,
1119 output_column_ids: None,
1120 preload_all_rows: (),
1121 enable_vnode_key_stats: None,
1122 enable_state_table_vnode_stats_pruning: false,
1123 metrics: None,
1124 _serde: Default::default(),
1125 }
1126 }
1127}
1128
1129impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1130where
1131 S: StateStore,
1132 SD: ValueRowSerde,
1133{
1134 pub fn get_data_types(&self) -> &[DataType] {
1135 &self.data_types
1136 }
1137
1138 pub fn table_id(&self) -> TableId {
1139 self.table_id
1140 }
1141
1142 fn compute_prefix_vnode(&self, pk_prefix: &impl Row) -> VirtualNode {
1144 self.distribution
1145 .try_compute_vnode_by_pk_prefix(pk_prefix)
1146 .expect("For streaming, the given prefix must be enough to calculate the vnode")
1147 }
1148
1149 pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode {
1151 self.distribution.compute_vnode_by_pk(pk)
1152 }
1153
1154 pub fn pk_indices(&self) -> &[usize] {
1157 &self.pk_indices
1158 }
1159
1160 pub fn pk_in_output_indices(&self) -> Option<Vec<usize>> {
1164 assert!(IS_REPLICATED);
1165 self.pk_indices
1166 .iter()
1167 .map(|&i| self.output_indices.iter().position(|&j| i == j))
1168 .collect()
1169 }
1170
1171 pub fn pk_serde(&self) -> &OrderedRowSerde {
1172 &self.pk_serde
1173 }
1174
1175 pub fn vnodes(&self) -> &Arc<Bitmap> {
1176 self.distribution.vnodes()
1177 }
1178
1179 pub fn value_indices(&self) -> &Option<Vec<usize>> {
1180 &self.value_indices
1181 }
1182
1183 pub fn is_consistent_op(&self) -> bool {
1184 matches!(
1185 self.op_consistency_level,
1186 StateTableOpConsistencyLevel::ConsistentOldValue
1187 | StateTableOpConsistencyLevel::LogStoreEnabled
1188 )
1189 }
1190
1191 pub fn metrics(&self) -> Option<&StateTableMetrics> {
1192 self.row_store.metrics.as_ref()
1193 }
1194}
1195
1196impl<S, SD> StateTableInner<S, SD, true>
1197where
1198 S: StateStore,
1199 SD: ValueRowSerde,
1200{
1201 pub async fn new_replicated(
1203 table_catalog: &Table,
1204 store: S,
1205 vnodes: Option<Arc<Bitmap>>,
1206 output_column_ids: Vec<ColumnId>,
1207 ) -> Self {
1208 StateTableBuilder::new(table_catalog, store, vnodes)
1211 .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
1212 .with_output_column_ids(output_column_ids)
1213 .forbid_preload_all_rows()
1214 .build()
1215 .await
1216 }
1217}
1218
1219impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1221where
1222 S: StateStore,
1223 SD: ValueRowSerde,
1224{
1225 pub async fn get_row(&self, pk: impl Row) -> StreamExecutorResult<Option<OwnedRow>> {
1227 let (serialized_pk, prefix_hint) = self.serialize_pk_and_get_prefix_hint(&pk);
1228 let row = self.row_store.get(serialized_pk, prefix_hint).await?;
1229 match row {
1230 Some(row) => {
1231 if IS_REPLICATED {
1232 let row = row.project(&self.output_indices);
1235 Ok(Some(row.into_owned_row()))
1236 } else {
1237 Ok(Some(row))
1238 }
1239 }
1240 None => Ok(None),
1241 }
1242 }
1243
1244 pub async fn exists(&self, pk: impl Row) -> StreamExecutorResult<bool> {
1246 let (serialized_pk, prefix_hint) = self.serialize_pk_and_get_prefix_hint(&pk);
1247 self.row_store.exists(serialized_pk, prefix_hint).await
1248 }
1249
1250 fn serialize_pk(&self, pk: &impl Row) -> TableKey<Bytes> {
1251 assert!(pk.len() <= self.pk_indices.len());
1252 serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_vnode_by_pk(pk))
1253 }
1254
1255 fn serialize_pk_and_get_prefix_hint(&self, pk: &impl Row) -> (TableKey<Bytes>, Option<Bytes>) {
1256 let serialized_pk = self.serialize_pk(&pk);
1257 let prefix_hint = if should_calculate_prefix_hint(self.prefix_hint_len, pk.len(), false) {
1258 Some(serialized_pk.slice(VirtualNode::SIZE..))
1259 } else {
1260 #[cfg(debug_assertions)]
1261 if self.prefix_hint_len != 0 {
1262 warn!(
1263 "prefix_hint_len is not equal to pk.len(), may not be able to utilize bloom filter"
1264 );
1265 }
1266 None
1267 };
1268 (serialized_pk, prefix_hint)
1269 }
1270}
1271
1272impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1273 async fn get(
1274 &self,
1275 key_bytes: TableKey<Bytes>,
1276 prefix_hint: Option<Bytes>,
1277 ) -> StreamExecutorResult<Option<OwnedRow>> {
1278 if let Some(m) = &self.metrics {
1279 m.get_count.inc();
1280 }
1281 if let Some(rows) = &self.all_rows {
1282 let (vnode, key) = key_bytes.split_vnode_bytes();
1283 return Ok(rows.get(&vnode).expect("covered vnode").get(&key).cloned());
1284 }
1285
1286 let should_prune = if let Some(stats) = &self.vnode_stats
1288 && let (vnode, key) = key_bytes.split_vnode_bytes()
1289 && let Some(vnode_stat) = stats.get(&vnode)
1290 && vnode_stat.can_prune(&key)
1291 {
1292 if let Some(m) = &self.metrics {
1293 m.get_vnode_pruned_count.inc();
1294 }
1295 true
1296 } else {
1297 false
1298 };
1299
1300 if should_prune && self.enable_state_table_vnode_stats_pruning {
1301 return Ok(None);
1302 }
1303
1304 let read_options = ReadOptions {
1305 prefix_hint,
1306 cache_policy: CachePolicy::Fill(Hint::Normal),
1307 ..Default::default()
1308 };
1309
1310 let result = self
1311 .state_store
1312 .on_key_value(key_bytes, read_options, move |_, value| {
1313 let row = self.row_serde.deserialize(value)?;
1314 Ok(OwnedRow::new(row))
1315 })
1316 .await
1317 .map_err(Into::<StreamExecutorError>::into)?;
1318
1319 if should_prune && result.is_some() {
1321 tracing::warn!(
1322 table_id = %self.table_id,
1323 "vnode stats pruning dry run fails for get. This will not affect correctness."
1324 );
1325 }
1326
1327 Ok(result)
1328 }
1329
1330 async fn exists(
1331 &self,
1332 key_bytes: TableKey<Bytes>,
1333 prefix_hint: Option<Bytes>,
1334 ) -> StreamExecutorResult<bool> {
1335 if let Some(m) = &self.metrics {
1336 m.get_count.inc();
1337 }
1338 if let Some(rows) = &self.all_rows {
1339 let (vnode, key) = key_bytes.split_vnode_bytes();
1340 return Ok(rows.get(&vnode).expect("covered vnode").contains_key(&key));
1341 }
1342
1343 let should_prune = if let Some(stats) = &self.vnode_stats
1345 && let (vnode, key) = key_bytes.split_vnode_bytes()
1346 && let Some(vnode_stat) = stats.get(&vnode)
1347 && vnode_stat.can_prune(&key)
1348 {
1349 if let Some(m) = &self.metrics {
1350 m.get_vnode_pruned_count.inc();
1351 }
1352 true
1353 } else {
1354 false
1355 };
1356
1357 if should_prune && self.enable_state_table_vnode_stats_pruning {
1358 return Ok(false);
1359 }
1360
1361 let read_options = ReadOptions {
1362 prefix_hint,
1363 cache_policy: CachePolicy::Fill(Hint::Normal),
1364 ..Default::default()
1365 };
1366 let result = self
1367 .state_store
1368 .on_key_value(key_bytes, read_options, move |_, _| Ok(()))
1369 .await?;
1370 let exists = result.is_some();
1371
1372 if should_prune && exists {
1374 tracing::warn!(
1375 table_id = %self.table_id,
1376 "vnode stats pruning dry run fails for exists. This will not affect correctness."
1377 );
1378 }
1379
1380 Ok(exists)
1381 }
1382}
1383
1384#[must_use]
1399pub struct StateTablePostCommit<'a, S, SD = BasicSerde, const IS_REPLICATED: bool = false>
1400where
1401 S: StateStore,
1402 SD: ValueRowSerde,
1403{
1404 inner: &'a mut StateTableInner<S, SD, IS_REPLICATED>,
1405}
1406
1407impl<'a, S, SD, const IS_REPLICATED: bool> StateTablePostCommit<'a, S, SD, IS_REPLICATED>
1408where
1409 S: StateStore,
1410 SD: ValueRowSerde,
1411{
1412 pub async fn post_yield_barrier(
1417 mut self,
1418 new_vnodes: Option<Arc<Bitmap>>,
1419 ) -> StreamExecutorResult<
1420 Option<(
1421 (
1422 Arc<Bitmap>,
1423 Arc<Bitmap>,
1424 &'a mut StateTableInner<S, SD, IS_REPLICATED>,
1425 ),
1426 bool,
1427 )>,
1428 > {
1429 self.inner.on_post_commit = false;
1430 Ok(if let Some(new_vnodes) = new_vnodes {
1431 let (old_vnodes, keyed_cache_may_stale) =
1432 self.update_vnode_bitmap(new_vnodes.clone()).await?;
1433 Some(((new_vnodes, old_vnodes, self.inner), keyed_cache_may_stale))
1434 } else {
1435 None
1436 })
1437 }
1438
1439 pub fn inner(&self) -> &StateTableInner<S, SD, IS_REPLICATED> {
1440 &*self.inner
1441 }
1442
1443 async fn update_vnode_bitmap(
1445 &mut self,
1446 new_vnodes: Arc<Bitmap>,
1447 ) -> StreamExecutorResult<(Arc<Bitmap>, bool)> {
1448 let prev_vnodes = self
1449 .inner
1450 .row_store
1451 .update_vnode_bitmap(new_vnodes.clone())
1452 .await?;
1453 assert_eq!(
1454 &prev_vnodes,
1455 self.inner.vnodes(),
1456 "state table and state store vnode bitmap mismatches"
1457 );
1458
1459 if self.inner.distribution.is_singleton() {
1460 assert_eq!(
1461 &new_vnodes,
1462 self.inner.vnodes(),
1463 "should not update vnode bitmap for singleton table"
1464 );
1465 }
1466 assert_eq!(self.inner.vnodes().len(), new_vnodes.len());
1467
1468 let keyed_cache_may_stale = keyed_cache_may_stale(self.inner.vnodes(), &new_vnodes);
1469
1470 if keyed_cache_may_stale {
1471 self.inner.pending_watermark = None;
1472 }
1473
1474 Ok((
1475 self.inner.distribution.update_vnode_bitmap(new_vnodes),
1476 keyed_cache_may_stale,
1477 ))
1478 }
1479}
1480
1481impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1483 fn handle_mem_table_error(&self, e: StorageError) {
1484 let e = match e.into_inner() {
1485 ErrorKind::MemTable(e) => e,
1486 _ => unreachable!("should only get memtable error"),
1487 };
1488 match *e {
1489 MemTableError::InconsistentOperation { key, prev, new, .. } => {
1490 let (vnode, key) = deserialize_pk_with_vnode(&key, &self.pk_serde).unwrap();
1491 panic!(
1492 "mem-table operation inconsistent! table_id: {}, vnode: {}, key: {:?}, prev: {}, new: {}",
1493 self.table_id,
1494 vnode,
1495 &key,
1496 prev.debug_fmt(&*self.row_serde),
1497 new.debug_fmt(&*self.row_serde),
1498 )
1499 }
1500 }
1501 }
1502
1503 fn insert(&mut self, key: TableKey<Bytes>, value: impl Row) {
1504 insane_mode_discard_point!();
1505 let value_bytes = self.row_serde.serialize(&value).into();
1506
1507 let (vnode, key_without_vnode) = key.split_vnode_bytes();
1508
1509 if self.all_rows.is_none()
1511 && let Some(stats) = &mut self.vnode_stats
1512 && let Some(vnode_stat) = stats.get_mut(&vnode)
1513 {
1514 vnode_stat.update_with_key(&key_without_vnode);
1515 }
1516
1517 if let Some(rows) = &mut self.all_rows {
1518 rows.get_mut(&vnode)
1519 .expect("covered vnode")
1520 .insert(key_without_vnode, value.into_owned_row());
1521 }
1522 self.state_store
1523 .insert(key, value_bytes, None)
1524 .unwrap_or_else(|e| self.handle_mem_table_error(e));
1525 }
1526
1527 fn delete(&mut self, key: TableKey<Bytes>, value: impl Row) {
1528 insane_mode_discard_point!();
1529 let value_bytes = self.row_serde.serialize(value).into();
1530
1531 let (vnode, key_without_vnode) = key.split_vnode_bytes();
1532
1533 if self.all_rows.is_none()
1534 && let Some(stats) = &mut self.vnode_stats
1535 && let Some(vnode_stat) = stats.get_mut(&vnode)
1536 {
1537 vnode_stat.update_with_key(&key_without_vnode);
1538 }
1539
1540 if let Some(rows) = &mut self.all_rows {
1541 rows.get_mut(&vnode)
1542 .expect("covered vnode")
1543 .remove(&key_without_vnode);
1544 }
1545 self.state_store
1546 .delete(key, value_bytes)
1547 .unwrap_or_else(|e| self.handle_mem_table_error(e));
1548 }
1549
1550 fn update(&mut self, key_bytes: TableKey<Bytes>, old_value: impl Row, new_value: impl Row) {
1551 insane_mode_discard_point!();
1552 let new_value_bytes = self.row_serde.serialize(&new_value).into();
1553 let old_value_bytes = self.row_serde.serialize(old_value).into();
1554
1555 let (vnode, key_without_vnode) = key_bytes.split_vnode_bytes();
1556
1557 if self.all_rows.is_none()
1560 && let Some(stats) = &mut self.vnode_stats
1561 && let Some(vnode_stat) = stats.get_mut(&vnode)
1562 {
1563 vnode_stat.update_with_key(&key_without_vnode);
1564 }
1565
1566 if let Some(rows) = &mut self.all_rows {
1567 rows.get_mut(&vnode)
1568 .expect("covered vnode")
1569 .insert(key_without_vnode, new_value.into_owned_row());
1570 }
1571 self.state_store
1572 .insert(key_bytes, new_value_bytes, Some(old_value_bytes))
1573 .unwrap_or_else(|e| self.handle_mem_table_error(e));
1574 }
1575}
1576
1577impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1578where
1579 S: StateStore,
1580 SD: ValueRowSerde,
1581{
1582 pub fn insert(&mut self, value: impl Row) {
1585 let pk_indices = &self.pk_indices;
1586 let pk = (&value).project(pk_indices);
1587
1588 let key_bytes = self.serialize_pk(&pk);
1589 dispatch_value_indices!(&self.value_indices, [value], {
1590 self.row_store.insert(key_bytes, value)
1591 })
1592 }
1593
1594 pub fn delete(&mut self, old_value: impl Row) {
1597 let pk_indices = &self.pk_indices;
1598 let pk = (&old_value).project(pk_indices);
1599
1600 let key_bytes = self.serialize_pk(&pk);
1601 dispatch_value_indices!(&self.value_indices, [old_value], {
1602 self.row_store.delete(key_bytes, old_value)
1603 })
1604 }
1605
1606 pub fn update(&mut self, old_value: impl Row, new_value: impl Row) {
1608 let old_pk = (&old_value).project(self.pk_indices());
1609 let new_pk = (&new_value).project(self.pk_indices());
1610 debug_assert!(
1611 Row::eq(&old_pk, new_pk),
1612 "pk should not change: {old_pk:?} vs {new_pk:?}. {}",
1613 self.table_id
1614 );
1615
1616 let key_bytes = self.serialize_pk(&new_pk);
1617 dispatch_value_indices!(&self.value_indices, [old_value, new_value], {
1618 self.row_store.update(key_bytes, old_value, new_value)
1619 })
1620 }
1621
1622 pub fn write_record(&mut self, record: Record<impl Row>) {
1624 match record {
1625 Record::Insert { new_row } => self.insert(new_row),
1626 Record::Delete { old_row } => self.delete(old_row),
1627 Record::Update { old_row, new_row } => self.update(old_row, new_row),
1628 }
1629 }
1630
1631 fn fill_non_output_indices(&self, chunk: StreamChunk) -> StreamChunk {
1632 fill_non_output_indices(&self.i2o_mapping, &self.data_types, chunk)
1633 }
1634
1635 #[allow(clippy::disallowed_methods)]
1638 pub fn write_chunk(&mut self, chunk: StreamChunk) {
1639 let chunk = if IS_REPLICATED {
1640 self.fill_non_output_indices(chunk)
1641 } else {
1642 chunk
1643 };
1644
1645 let vnodes = self
1646 .distribution
1647 .compute_chunk_vnode(&chunk, &self.pk_indices);
1648
1649 for (idx, optional_row) in chunk.rows_with_holes().enumerate() {
1650 let Some((op, row)) = optional_row else {
1651 continue;
1652 };
1653 let pk = row.project(&self.pk_indices);
1654 let vnode = vnodes[idx];
1655 let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, vnode);
1656 match op {
1657 Op::Insert | Op::UpdateInsert => {
1658 dispatch_value_indices!(&self.value_indices, [row], {
1659 self.row_store.insert(key_bytes, row);
1660 });
1661 }
1662 Op::Delete | Op::UpdateDelete => {
1663 dispatch_value_indices!(&self.value_indices, [row], {
1664 self.row_store.delete(key_bytes, row);
1665 });
1666 }
1667 }
1668 }
1669 }
1670
1671 pub fn update_watermark(&mut self, watermark: ScalarImpl) {
1677 trace!(table_id = %self.table_id, watermark = ?watermark, "update watermark");
1678 self.pending_watermark = Some(watermark);
1679 }
1680
1681 pub fn get_committed_watermark(&self) -> Option<&ScalarImpl> {
1684 self.committed_watermark.as_ref()
1685 }
1686
1687 pub async fn commit(
1688 &mut self,
1689 new_epoch: EpochPair,
1690 ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED>> {
1691 self.commit_inner(new_epoch, None).await
1692 }
1693
1694 #[cfg(test)]
1695 pub async fn commit_for_test(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
1696 self.commit_assert_no_update_vnode_bitmap(new_epoch).await
1697 }
1698
1699 pub async fn commit_assert_no_update_vnode_bitmap(
1700 &mut self,
1701 new_epoch: EpochPair,
1702 ) -> StreamExecutorResult<()> {
1703 let post_commit = self.commit_inner(new_epoch, None).await?;
1704 post_commit.post_yield_barrier(None).await?;
1705 Ok(())
1706 }
1707
1708 pub async fn commit_may_switch_consistent_op(
1709 &mut self,
1710 new_epoch: EpochPair,
1711 op_consistency_level: StateTableOpConsistencyLevel,
1712 ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED>> {
1713 if self.op_consistency_level != op_consistency_level {
1714 if !cfg!(debug_assertions) {
1716 info!(
1717 ?new_epoch,
1718 prev_op_consistency_level = ?self.op_consistency_level,
1719 ?op_consistency_level,
1720 table_id = %self.table_id,
1721 "switch to new op consistency level"
1722 );
1723 }
1724 self.commit_inner(new_epoch, Some(op_consistency_level))
1725 .await
1726 } else {
1727 self.commit_inner(new_epoch, None).await
1728 }
1729 }
1730
1731 async fn commit_inner(
1732 &mut self,
1733 new_epoch: EpochPair,
1734 switch_consistent_op: Option<StateTableOpConsistencyLevel>,
1735 ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED>> {
1736 assert!(!self.on_post_commit);
1737 assert_eq!(
1738 self.epoch.expect("should only be called after init").curr,
1739 new_epoch.prev
1740 );
1741 if let Some(new_consistency_level) = switch_consistent_op {
1742 assert_ne!(self.op_consistency_level, new_consistency_level);
1743 self.op_consistency_level = new_consistency_level;
1744 }
1745 trace!(
1746 table_id = %self.table_id,
1747 epoch = ?self.epoch,
1748 "commit state table"
1749 );
1750
1751 let table_watermarks = self.commit_pending_watermark();
1752 self.row_store
1753 .seal_current_epoch(new_epoch.curr, table_watermarks, switch_consistent_op)
1754 .await?;
1755 self.epoch = Some(new_epoch);
1756
1757 self.on_post_commit = true;
1758 Ok(StateTablePostCommit { inner: self })
1759 }
1760
1761 fn commit_pending_watermark(
1763 &mut self,
1764 ) -> Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)> {
1765 let watermark = self.pending_watermark.take()?;
1766 trace!(table_id = %self.table_id, watermark = ?watermark, "state cleaning");
1767
1768 assert!(
1769 !self.pk_indices().is_empty(),
1770 "see pending watermark on empty pk"
1771 );
1772 let (watermark_serializer, watermark_type) = self
1773 .watermark_serde
1774 .as_ref()
1775 .expect("watermark serde should be initialized to commit watermark");
1776 let watermark_suffix =
1777 serialize_row(row::once(Some(watermark.clone())), watermark_serializer);
1778 let vnode_watermark = VnodeWatermark::new(
1779 self.vnodes().clone(),
1780 Bytes::copy_from_slice(watermark_suffix.as_ref()),
1781 );
1782 trace!(table_id = %self.table_id, ?vnode_watermark, "table watermark");
1783
1784 let order_type = watermark_serializer.get_order_types().get(0).unwrap();
1785 let direction = if order_type.is_ascending() {
1786 WatermarkDirection::Ascending
1787 } else {
1788 WatermarkDirection::Descending
1789 };
1790
1791 self.committed_watermark = Some(watermark);
1792 Some((direction, vec![vnode_watermark], *watermark_type))
1793 }
1794
1795 pub async fn try_flush(&mut self) -> StreamExecutorResult<()> {
1796 self.row_store.try_flush().await?;
1797 Ok(())
1798 }
1799}
1800
1801pub trait RowStream<'a>: Stream<Item = StreamExecutorResult<OwnedRow>> + 'a {}
1803impl<'a, S: Stream<Item = StreamExecutorResult<OwnedRow>> + 'a> RowStream<'a> for S {}
1804
1805pub trait KeyedRowStream<'a>: Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a {}
1806impl<'a, S: Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a> KeyedRowStream<'a> for S {}
1807
1808pub trait PkRowStream<'a, K>: Stream<Item = StreamExecutorResult<(K, OwnedRow)>> + 'a {}
1809impl<'a, K, S: Stream<Item = StreamExecutorResult<(K, OwnedRow)>> + 'a> PkRowStream<'a, K> for S {}
1810
1811pub type BoxedRowStream<'a> = BoxStream<'a, StreamExecutorResult<OwnedRow>>;
1812
1813pub trait FromVnodeBytes {
1814 fn from_vnode_bytes(vnode: VirtualNode, bytes: &Bytes) -> Self;
1815}
1816
1817impl FromVnodeBytes for Bytes {
1818 fn from_vnode_bytes(vnode: VirtualNode, bytes: &Bytes) -> Self {
1819 prefix_slice_with_vnode(vnode, bytes)
1820 }
1821}
1822
1823impl FromVnodeBytes for () {
1824 fn from_vnode_bytes(_vnode: VirtualNode, _bytes: &Bytes) -> Self {}
1825}
1826
1827impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1829where
1830 S: StateStore,
1831 SD: ValueRowSerde,
1832{
1833 pub async fn iter_with_vnode(
1836 &self,
1837
1838 vnode: VirtualNode,
1842 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1843 prefetch_options: PrefetchOptions,
1844 ) -> StreamExecutorResult<impl RowStream<'_>> {
1845 Ok(self
1846 .iter_kv_with_pk_range::<()>(pk_range, vnode, prefetch_options)
1847 .await?
1848 .map_ok(|(_, row)| {
1849 if IS_REPLICATED {
1850 row.project(&self.output_indices).into_owned_row()
1851 } else {
1852 row
1853 }
1854 }))
1855 }
1856
1857 pub async fn iter_keyed_row_with_vnode(
1858 &self,
1859 vnode: VirtualNode,
1860 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1861 prefetch_options: PrefetchOptions,
1862 ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1863 Ok(self
1864 .iter_kv_with_pk_range(pk_range, vnode, prefetch_options)
1865 .await?
1866 .map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)))
1867 }
1868}
1869
1870impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1871 async fn iter_kv<K: CopyFromSlice + FromVnodeBytes>(
1876 &self,
1877 vnode: VirtualNode,
1878 (start, end): (Bound<Bytes>, Bound<Bytes>),
1879 prefix_hint: Option<Bytes>,
1880 prefetch_options: PrefetchOptions,
1881 ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1882 if let Some(m) = &self.metrics {
1883 m.iter_count.inc();
1884 }
1885 let (pruned_start, pruned_end, should_prune_entirely) = if let Some(stats) =
1887 &self.vnode_stats
1888 && let Some(vnode_stat) = stats.get(&vnode)
1889 {
1890 match vnode_stat.pruned_key_range(&start, &end) {
1891 Some((new_start, new_end)) => {
1892 if self.enable_state_table_vnode_stats_pruning {
1893 (new_start, new_end, false)
1894 } else {
1895 (start, end, false)
1897 }
1898 }
1899 None => {
1900 if let Some(m) = &self.metrics {
1901 m.iter_vnode_pruned_count.inc();
1902 }
1903 (start.clone(), end.clone(), true)
1905 }
1906 }
1907 } else {
1908 (start, end, false)
1909 };
1910
1911 if should_prune_entirely && self.enable_state_table_vnode_stats_pruning {
1912 return Ok(futures::future::Either::Left(futures::stream::empty()));
1913 }
1914
1915 let table_id = self.table_id;
1916 let inspect_fn = move |result: &StreamExecutorResult<(K, OwnedRow)>| {
1917 if should_prune_entirely && result.is_ok() {
1919 tracing::warn!(
1920 table_id = %table_id,
1921 "vnode stats pruning dry run fails for iter. This will not affect correctness."
1922 );
1923 }
1924 };
1925
1926 if let Some(rows) = &self.all_rows {
1927 return Ok(futures::future::Either::Right(
1928 futures::future::Either::Left(
1929 futures::stream::iter(
1930 rows.get(&vnode)
1931 .expect("covered vnode")
1932 .range((pruned_start, pruned_end))
1933 .map(move |(key, value)| {
1934 Ok((K::from_vnode_bytes(vnode, key), value.clone()))
1935 }),
1936 )
1937 .inspect(inspect_fn),
1938 ),
1939 ));
1940 }
1941 let read_options = ReadOptions {
1942 prefix_hint,
1943 prefetch_options,
1944 cache_policy: CachePolicy::Fill(Hint::Normal),
1945 };
1946
1947 Ok(futures::future::Either::Right(
1948 futures::future::Either::Right(
1949 deserialize_keyed_row_stream(
1950 self.state_store
1951 .iter(
1952 prefixed_range_with_vnode((pruned_start, pruned_end), vnode),
1953 read_options,
1954 )
1955 .await?,
1956 &*self.row_serde,
1957 )
1958 .inspect(inspect_fn),
1959 ),
1960 ))
1961 }
1962
1963 async fn rev_iter_kv<K: CopyFromSlice + FromVnodeBytes>(
1964 &self,
1965 vnode: VirtualNode,
1966 (start, end): (Bound<Bytes>, Bound<Bytes>),
1967 prefix_hint: Option<Bytes>,
1968 prefetch_options: PrefetchOptions,
1969 ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1970 if let Some(m) = &self.metrics {
1971 m.iter_count.inc();
1972 }
1973 let (pruned_start, pruned_end, should_prune_entirely) = if let Some(stats) =
1975 &self.vnode_stats
1976 && let Some(vnode_stat) = stats.get(&vnode)
1977 {
1978 match vnode_stat.pruned_key_range(&start, &end) {
1979 Some((new_start, new_end)) => {
1980 if self.enable_state_table_vnode_stats_pruning {
1981 (new_start, new_end, false)
1982 } else {
1983 (start, end, false)
1985 }
1986 }
1987 None => {
1988 if let Some(m) = &self.metrics {
1989 m.iter_vnode_pruned_count.inc();
1990 }
1991 (start, end, true)
1993 }
1994 }
1995 } else {
1996 (start, end, false)
1997 };
1998
1999 if should_prune_entirely && self.enable_state_table_vnode_stats_pruning {
2000 return Ok(futures::future::Either::Left(futures::stream::empty()));
2001 }
2002
2003 let table_id = self.table_id;
2004 let inspect_fn = move |result: &StreamExecutorResult<(K, OwnedRow)>| {
2005 if should_prune_entirely && result.is_ok() {
2007 tracing::warn!(
2008 table_id = %table_id,
2009 "vnode stats pruning dry run fails for rev_iter. This will not affect correctness."
2010 );
2011 }
2012 };
2013
2014 if let Some(rows) = &self.all_rows {
2015 return Ok(futures::future::Either::Right(
2016 futures::future::Either::Left(
2017 futures::stream::iter(
2018 rows.get(&vnode)
2019 .expect("covered vnode")
2020 .range((pruned_start, pruned_end))
2021 .rev()
2022 .map(move |(key, value)| {
2023 Ok((K::from_vnode_bytes(vnode, key), value.clone()))
2024 }),
2025 )
2026 .inspect(inspect_fn),
2027 ),
2028 ));
2029 }
2030 let read_options = ReadOptions {
2031 prefix_hint,
2032 prefetch_options,
2033 cache_policy: CachePolicy::Fill(Hint::Normal),
2034 };
2035
2036 Ok(futures::future::Either::Right(
2037 futures::future::Either::Right(
2038 deserialize_keyed_row_stream(
2039 self.state_store
2040 .rev_iter(
2041 prefixed_range_with_vnode((pruned_start, pruned_end), vnode),
2042 read_options,
2043 )
2044 .await?,
2045 &*self.row_serde,
2046 )
2047 .inspect(inspect_fn),
2048 ),
2049 ))
2050 }
2051}
2052
2053impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
2054where
2055 S: StateStore,
2056 SD: ValueRowSerde,
2057{
2058 pub async fn iter_with_prefix(
2062 &self,
2063 pk_prefix: impl Row,
2064 sub_range: &(Bound<impl Row>, Bound<impl Row>),
2065 prefetch_options: PrefetchOptions,
2066 ) -> StreamExecutorResult<impl RowStream<'_>> {
2067 let stream = self.iter_with_prefix_inner::<false, ()>(pk_prefix, sub_range, prefetch_options)
2068 .await?;
2069 Ok(stream.map_ok(|(_, row)| {
2070 if IS_REPLICATED {
2071 row.project(&self.output_indices).into_owned_row()
2072 } else {
2073 row
2074 }
2075 }))
2076 }
2077
2078 pub async fn iter_with_prefix_respecting_watermark(
2084 &self,
2085 pk_prefix: impl Row,
2086 sub_range: &(Bound<impl Row>, Bound<impl Row>),
2087 prefetch_options: PrefetchOptions,
2088 ) -> StreamExecutorResult<BoxedRowStream<'_>> {
2089 let vnode = self.compute_prefix_vnode(&pk_prefix);
2090 let Some(clean_watermark_index) = self.clean_watermark_index else {
2091 return self
2092 .iter_with_prefix(pk_prefix, sub_range, prefetch_options)
2093 .await
2094 .map(|s| s.boxed());
2095 };
2096 let Some((watermark_serde, watermark_type)) = &self.watermark_serde else {
2097 return Err(StreamExecutorError::from(anyhow!(
2098 "Missing watermark serde"
2099 )));
2100 };
2101 if matches!(watermark_type, WatermarkSerdeType::PkPrefix) {
2103 return self
2104 .iter_with_prefix(pk_prefix, sub_range, prefetch_options)
2105 .await
2106 .map(|s| s.boxed());
2107 }
2108
2109 let watermark_bytes = self.row_store.state_store.get_table_watermark(vnode);
2110 let Some(watermark_bytes) = watermark_bytes else {
2111 return self
2112 .iter_with_prefix(pk_prefix, sub_range, prefetch_options)
2113 .await
2114 .map(|s| s.boxed());
2115 };
2116 let watermark_row = watermark_serde.deserialize(&watermark_bytes)?;
2117 if watermark_row.len() != 1 {
2118 return Err(StreamExecutorError::from(format!(
2119 "Watermark row should have exactly 1 column, got {}",
2120 watermark_row.len()
2121 )));
2122 }
2123 let watermark_value = watermark_row[0].clone();
2124 if watermark_value.is_none() {
2126 return Err(StreamExecutorError::from(anyhow!(
2127 "Watermark cannot be NULL"
2128 )));
2129 }
2130 let order_type = watermark_serde.get_order_types().get(0).ok_or_else(|| {
2131 StreamExecutorError::from(anyhow!(
2132 "Watermark serde should have at least one order type"
2133 ))
2134 })?;
2135
2136 let direction = if order_type.is_ascending() {
2137 WatermarkDirection::Ascending
2138 } else {
2139 WatermarkDirection::Descending
2140 };
2141 let clean_watermark_index_in_pk = self
2142 .pk_indices
2143 .iter()
2144 .position(|&i| i == clean_watermark_index);
2145 let clean_watermark_index_in_value = match &self.value_indices {
2146 Some(value_indices) => value_indices
2147 .iter()
2148 .position(|idx| *idx == clean_watermark_index)
2149 .ok_or_else(|| {
2150 StreamExecutorError::from(anyhow!(
2151 "clean watermark column index {} is not included in table value indices {:?}",
2152 clean_watermark_index,
2153 value_indices
2154 ))
2155 })?,
2156 None => clean_watermark_index,
2157 };
2158
2159 let stream = self
2160 .iter_with_prefix_inner::<false, Bytes>(pk_prefix, sub_range, prefetch_options)
2161 .await?
2162 .try_filter_map(move |(pk, row)| {
2163 let should_filter = match watermark_type {
2164 WatermarkSerdeType::PkPrefix => unreachable!(),
2165 WatermarkSerdeType::NonPkPrefix => {
2166 let table_key = TableKey(pk);
2167 let (vnode, key) = table_key.split_vnode();
2168 let pk_cols = self.pk_serde
2169 .deserialize(key)
2170 .unwrap_or_else(|e| {
2171 panic!("Failed to deserialize table {} vnode {:?} key {:?} error: {:?}", self.table_id(), vnode, key, e.as_report());
2172 });
2173 direction.datum_filter_by_watermark(
2174 pk_cols.datum_at(clean_watermark_index_in_pk.unwrap()),
2175 &watermark_value,
2176 *order_type,
2177 )
2178 },
2179 WatermarkSerdeType::Value => {
2180 direction.datum_filter_by_watermark(
2181 row.datum_at(clean_watermark_index_in_value),
2182 &watermark_value,
2183 *order_type,
2184 )
2185 }
2186 };
2187 if should_filter {
2188 ready(Ok(None))
2189 } else {
2190 ready(Ok(Some(row)))
2191 }
2192 });
2193 Ok(stream.boxed())
2194 }
2195
2196 pub async fn get_from_one_row_table(&self) -> StreamExecutorResult<Option<OwnedRow>> {
2198 let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
2199 let stream = self
2200 .iter_with_prefix(row::empty(), sub_range, Default::default())
2201 .await?;
2202 pin_mut!(stream);
2203
2204 if let Some(res) = stream.next().await {
2205 let value = res?.into_owned_row();
2206 assert!(stream.next().await.is_none());
2207 Ok(Some(value))
2208 } else {
2209 Ok(None)
2210 }
2211 }
2212
2213 pub async fn get_from_one_value_table(&self) -> StreamExecutorResult<Option<ScalarImpl>> {
2218 Ok(self
2219 .get_from_one_row_table()
2220 .await?
2221 .and_then(|row| row[0].clone()))
2222 }
2223
2224 pub async fn iter_keyed_row_with_prefix(
2225 &self,
2226 pk_prefix: impl Row,
2227 sub_range: &(Bound<impl Row>, Bound<impl Row>),
2228 prefetch_options: PrefetchOptions,
2229 ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
2230 Ok(
2231 self.iter_with_prefix_inner::<false, Bytes>(pk_prefix, sub_range, prefetch_options)
2232 .await?.map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)),
2233 )
2234 }
2235
2236 pub async fn rev_iter_keyed_row_with_prefix(
2237 &self,
2238 pk_prefix: impl Row,
2239 sub_range: &(Bound<impl Row>, Bound<impl Row>),
2240 prefetch_options: PrefetchOptions,
2241 ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
2242 Ok(
2243 self.iter_with_prefix_inner::<true, Bytes>(pk_prefix, sub_range, prefetch_options)
2244 .await?.map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)),
2245 )
2246 }
2247
2248 pub async fn rev_iter_with_prefix(
2250 &self,
2251 pk_prefix: impl Row,
2252 sub_range: &(Bound<impl Row>, Bound<impl Row>),
2253 prefetch_options: PrefetchOptions,
2254 ) -> StreamExecutorResult<impl RowStream<'_>> {
2255 Ok(
2256 self.iter_with_prefix_inner::<true, ()>(pk_prefix, sub_range, prefetch_options)
2257 .await?.map_ok(|(_, row)| row),
2258 )
2259 }
2260
2261 async fn iter_with_prefix_inner<const REVERSE: bool, K: CopyFromSlice + FromVnodeBytes>(
2262 &self,
2263 pk_prefix: impl Row,
2264 sub_range: &(Bound<impl Row>, Bound<impl Row>),
2265 prefetch_options: PrefetchOptions,
2266 ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
2267 let prefix_serializer = self.pk_serde.prefix(pk_prefix.len());
2268 let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
2269
2270 let vnode = self.compute_prefix_vnode(&pk_prefix);
2274
2275 let pk_prefix_indices = &self.pk_indices[..pk_prefix.len()];
2277 if self.prefix_hint_len != 0 && !IS_REPLICATED {
2278 debug_assert_eq!(self.prefix_hint_len, pk_prefix.len());
2279 }
2280 let prefix_hint = {
2281 if should_calculate_prefix_hint(self.prefix_hint_len, pk_prefix.len(), true) {
2282 let encoded_prefix_len = self
2283 .pk_serde
2284 .deserialize_prefix_len(&encoded_prefix, self.prefix_hint_len)?;
2285
2286 Some(Bytes::copy_from_slice(
2287 &encoded_prefix[..encoded_prefix_len],
2288 ))
2289 } else {
2290 None
2291 }
2292 };
2293
2294 trace!(
2295 table_id = %self.table_id(),
2296 ?prefix_hint, ?pk_prefix,
2297 ?pk_prefix_indices,
2298 iter_direction = if REVERSE { "reverse" } else { "forward" },
2299 "storage_iter_with_prefix"
2300 );
2301
2302 let memcomparable_range =
2303 prefix_and_sub_range_to_memcomparable(&self.pk_serde, sub_range, pk_prefix);
2304
2305 Ok(if REVERSE {
2306 futures::future::Either::Left(
2307 self.row_store
2308 .rev_iter_kv(vnode, memcomparable_range, prefix_hint, prefetch_options)
2309 .await?,
2310 )
2311 } else {
2312 futures::future::Either::Right(
2313 self.row_store
2314 .iter_kv(vnode, memcomparable_range, prefix_hint, prefetch_options)
2315 .await?,
2316 )
2317 })
2318 }
2319
2320 async fn iter_kv_with_pk_range<'a, K: CopyFromSlice + FromVnodeBytes>(
2323 &'a self,
2324 pk_range: &(Bound<impl Row>, Bound<impl Row>),
2325 vnode: VirtualNode,
2329 prefetch_options: PrefetchOptions,
2330 ) -> StreamExecutorResult<impl PkRowStream<'a, K>> {
2331 let memcomparable_range = prefix_range_to_memcomparable(&self.pk_serde, pk_range);
2332
2333 self.row_store
2335 .iter_kv(vnode, memcomparable_range, None, prefetch_options)
2336 .await
2337 }
2338}
2339
2340fn deserialize_keyed_row_stream<'a, K: CopyFromSlice>(
2341 iter: impl StateStoreIter + 'a,
2342 deserializer: &'a impl ValueRowSerde,
2343) -> impl PkRowStream<'a, K> {
2344 iter.into_stream(move |(key, value)| {
2345 Ok((
2346 K::copy_from_slice(key.user_key.table_key.as_ref()),
2347 deserializer.deserialize(value).map(OwnedRow::new)?,
2348 ))
2349 })
2350 .map_err(Into::into)
2351}
2352
2353pub fn prefix_range_to_memcomparable(
2354 pk_serde: &OrderedRowSerde,
2355 range: &(Bound<impl Row>, Bound<impl Row>),
2356) -> (Bound<Bytes>, Bound<Bytes>) {
2357 (
2358 start_range_to_memcomparable(pk_serde, &range.0),
2359 end_range_to_memcomparable(pk_serde, &range.1, None),
2360 )
2361}
2362
2363fn prefix_and_sub_range_to_memcomparable(
2364 pk_serde: &OrderedRowSerde,
2365 sub_range: &(Bound<impl Row>, Bound<impl Row>),
2366 pk_prefix: impl Row,
2367) -> (Bound<Bytes>, Bound<Bytes>) {
2368 let (range_start, range_end) = sub_range;
2369 let prefix_serializer = pk_serde.prefix(pk_prefix.len());
2370 let serialized_pk_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
2371 let start_range = match range_start {
2372 Included(start_range) => Bound::Included(Either::Left((&pk_prefix).chain(start_range))),
2373 Excluded(start_range) => Bound::Excluded(Either::Left((&pk_prefix).chain(start_range))),
2374 Unbounded => Bound::Included(Either::Right(&pk_prefix)),
2375 };
2376 let end_range = match range_end {
2377 Included(end_range) => Bound::Included((&pk_prefix).chain(end_range)),
2378 Excluded(end_range) => Bound::Excluded((&pk_prefix).chain(end_range)),
2379 Unbounded => Unbounded,
2380 };
2381 (
2382 start_range_to_memcomparable(pk_serde, &start_range),
2383 end_range_to_memcomparable(pk_serde, &end_range, Some(serialized_pk_prefix)),
2384 )
2385}
2386
2387fn start_range_to_memcomparable<R: Row>(
2388 pk_serde: &OrderedRowSerde,
2389 bound: &Bound<R>,
2390) -> Bound<Bytes> {
2391 let serialize_pk_prefix = |pk_prefix: &R| {
2392 let prefix_serializer = pk_serde.prefix(pk_prefix.len());
2393 serialize_pk(pk_prefix, &prefix_serializer)
2394 };
2395 match bound {
2396 Unbounded => Unbounded,
2397 Included(r) => {
2398 let serialized = serialize_pk_prefix(r);
2399
2400 Included(serialized)
2401 }
2402 Excluded(r) => {
2403 let serialized = serialize_pk_prefix(r);
2404
2405 start_bound_of_excluded_prefix(&serialized)
2406 }
2407 }
2408}
2409
2410fn end_range_to_memcomparable<R: Row>(
2411 pk_serde: &OrderedRowSerde,
2412 bound: &Bound<R>,
2413 serialized_pk_prefix: Option<Bytes>,
2414) -> Bound<Bytes> {
2415 let serialize_pk_prefix = |pk_prefix: &R| {
2416 let prefix_serializer = pk_serde.prefix(pk_prefix.len());
2417 serialize_pk(pk_prefix, &prefix_serializer)
2418 };
2419 match bound {
2420 Unbounded => match serialized_pk_prefix {
2421 Some(serialized_pk_prefix) => end_bound_of_prefix(&serialized_pk_prefix),
2422 None => Unbounded,
2423 },
2424 Included(r) => {
2425 let serialized = serialize_pk_prefix(r);
2426 end_bound_of_prefix(&serialized)
2428 }
2429 Excluded(r) => {
2430 let serialized = serialize_pk_prefix(r);
2431 Excluded(serialized)
2432 }
2433 }
2434}
2435
2436fn fill_non_output_indices(
2437 i2o_mapping: &ColIndexMapping,
2438 data_types: &[DataType],
2439 chunk: StreamChunk,
2440) -> StreamChunk {
2441 let cardinality = chunk.cardinality();
2442 let (ops, columns, vis) = chunk.into_inner();
2443 let mut full_columns = Vec::with_capacity(data_types.len());
2444 for (i, data_type) in data_types.iter().enumerate() {
2445 if let Some(j) = i2o_mapping.try_map(i) {
2446 full_columns.push(columns[j].clone());
2447 } else {
2448 let mut column_builder = ArrayImplBuilder::with_type(cardinality, data_type.clone());
2449 column_builder.append_n_null(cardinality);
2450 let column: ArrayRef = column_builder.finish().into();
2451 full_columns.push(column)
2452 }
2453 }
2454 let data_chunk = DataChunk::new(full_columns, vis);
2455 StreamChunk::from_parts(ops, data_chunk)
2456}
2457
2458#[cfg(test)]
2459mod tests {
2460 use std::fmt::Debug;
2461
2462 use expect_test::{Expect, expect};
2463
2464 use super::*;
2465
2466 fn check(actual: impl Debug, expect: Expect) {
2467 let actual = format!("{:#?}", actual);
2468 expect.assert_eq(&actual);
2469 }
2470
2471 #[test]
2472 fn test_fill_non_output_indices() {
2473 let data_types = vec![DataType::Int32, DataType::Int32, DataType::Int32];
2474 let replicated_chunk = [OwnedRow::new(vec![
2475 Some(222_i32.into()),
2476 Some(2_i32.into()),
2477 ])];
2478 let replicated_chunk = StreamChunk::from_parts(
2479 vec![Op::Insert],
2480 DataChunk::from_rows(&replicated_chunk, &[DataType::Int32, DataType::Int32]),
2481 );
2482 let i2o_mapping = ColIndexMapping::new(vec![Some(1), None, Some(0)], 2);
2483 let filled_chunk = fill_non_output_indices(&i2o_mapping, &data_types, replicated_chunk);
2484 check(
2485 filled_chunk,
2486 expect![[r#"
2487 StreamChunk { cardinality: 1, capacity: 1, data:
2488 +---+---+---+-----+
2489 | + | 2 | | 222 |
2490 +---+---+---+-----+
2491 }"#]],
2492 );
2493 }
2494}