1use std::collections::{BTreeMap, HashMap};
16use std::marker::PhantomData;
17use std::ops::Bound;
18use std::ops::Bound::*;
19use std::sync::Arc;
20use std::time::Instant;
21
22use bytes::Bytes;
23use either::Either;
24use foyer::Hint;
25use futures::future::try_join_all;
26use futures::{Stream, StreamExt, TryStreamExt, pin_mut};
27use itertools::Itertools;
28use risingwave_common::array::stream_record::Record;
29use risingwave_common::array::{ArrayImplBuilder, ArrayRef, DataChunk, Op, StreamChunk};
30use risingwave_common::bitmap::Bitmap;
31use risingwave_common::catalog::{
32 ColumnDesc, ColumnId, TableId, TableOption, get_dist_key_in_pk_indices,
33};
34use risingwave_common::config::StreamingConfig;
35use risingwave_common::hash::{VirtualNode, VnodeBitmapExt, VnodeCountCompat};
36use risingwave_common::row::{self, OwnedRow, Row, RowExt};
37use risingwave_common::types::{DataType, ScalarImpl};
38use risingwave_common::util::column_index_mapping::ColIndexMapping;
39use risingwave_common::util::epoch::EpochPair;
40use risingwave_common::util::row_serde::OrderedRowSerde;
41use risingwave_common::util::sort_util::OrderType;
42use risingwave_common::util::value_encoding::BasicSerde;
43use risingwave_hummock_sdk::HummockReadEpoch;
44use risingwave_hummock_sdk::key::{
45 CopyFromSlice, TableKey, end_bound_of_prefix, next_key, prefix_slice_with_vnode,
46 prefixed_range_with_vnode, start_bound_of_excluded_prefix,
47};
48use risingwave_hummock_sdk::table_watermark::{
49 VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
50};
51use risingwave_pb::catalog::Table;
52use risingwave_storage::StateStore;
53use risingwave_storage::error::{ErrorKind, StorageError, StorageResult};
54use risingwave_storage::hummock::CachePolicy;
55use risingwave_storage::mem_table::MemTableError;
56use risingwave_storage::row_serde::find_columns_by_ids;
57use risingwave_storage::row_serde::row_serde_util::{
58 deserialize_pk_with_vnode, serialize_pk, serialize_pk_with_vnode,
59};
60use risingwave_storage::row_serde::value_serde::ValueRowSerde;
61use risingwave_storage::store::*;
62use risingwave_storage::table::{KeyedRow, TableDistribution};
63use thiserror_ext::AsReport;
64use tracing::{Instrument, trace};
65
66use crate::cache::cache_may_stale;
67use crate::executor::StreamExecutorResult;
68use crate::executor::monitor::streaming_stats::StateTableMetrics;
69
70macro_rules! insane_mode_discard_point {
73 () => {{
74 use rand::Rng;
75 if crate::consistency::insane() && rand::rng().random_bool(0.3) {
76 return;
77 }
78 }};
79}
80
81struct VnodeStatistics {
85 min_key: Option<Bytes>,
86 max_key: Option<Bytes>,
87}
88
89impl VnodeStatistics {
90 fn new() -> Self {
91 Self {
92 min_key: None,
93 max_key: None,
94 }
95 }
96
97 fn update_with_key(&mut self, key: &Bytes) {
98 if let Some(min) = &self.min_key {
99 if key < min {
100 self.min_key = Some(key.clone());
101 }
102 } else {
103 self.min_key = Some(key.clone());
104 }
105
106 if let Some(max) = &self.max_key {
107 if key > max {
108 self.max_key = Some(key.clone());
109 }
110 } else {
111 self.max_key = Some(key.clone());
112 }
113 }
114
115 fn can_prune(&self, key: &Bytes) -> bool {
116 if let Some(min) = &self.min_key
117 && key < min
118 {
119 return true;
120 }
121 if let Some(max) = &self.max_key
122 && key > max
123 {
124 return true;
125 }
126 false
127 }
128
129 fn can_prune_range(&self, start: &Bound<Bytes>, end: &Bound<Bytes>) -> bool {
130 if let Some(max) = &self.max_key {
132 match start {
133 Included(s) if s > max => return true,
134 Excluded(s) if s >= max => return true,
135 _ => {}
136 }
137 }
138 if let Some(min) = &self.min_key {
139 match end {
140 Included(e) if e < min => return true,
141 Excluded(e) if e <= min => return true,
142 _ => {}
143 }
144 }
145 false
146 }
147
148 fn pruned_key_range(
149 &self,
150 start: &Bound<Bytes>,
151 end: &Bound<Bytes>,
152 ) -> Option<(Bound<Bytes>, Bound<Bytes>)> {
153 if self.can_prune_range(start, end) {
154 return None;
155 }
156 let new_start = if let Some(min) = &self.min_key {
157 match start {
158 Included(s) if s <= min => Included(min.clone()),
159 Excluded(s) if s < min => Included(min.clone()),
160 _ => start.clone(),
161 }
162 } else {
163 start.clone()
164 };
165
166 let new_end = if let Some(max) = &self.max_key {
167 match end {
168 Included(e) if e >= max => Included(max.clone()),
169 Excluded(e) if e > max => Included(max.clone()),
170 _ => end.clone(),
171 }
172 } else {
173 end.clone()
174 };
175
176 Some((new_start, new_end))
177 }
178}
179
180pub struct StateTableInner<S, SD = BasicSerde, const IS_REPLICATED: bool = false>
183where
184 S: StateStore,
185 SD: ValueRowSerde,
186{
187 table_id: TableId,
189
190 row_store: StateTableRowStore<S::Local, SD>,
192
193 store: S,
195
196 epoch: Option<EpochPair>,
198
199 pk_serde: OrderedRowSerde,
201
202 pk_indices: Vec<usize>,
206
207 distribution: TableDistribution,
213
214 prefix_hint_len: usize,
215
216 value_indices: Option<Vec<usize>>,
217
218 pub clean_watermark_index: Option<usize>,
220 pending_watermark: Option<ScalarImpl>,
222 committed_watermark: Option<ScalarImpl>,
224 watermark_serde: Option<(OrderedRowSerde, WatermarkSerdeType)>,
226
227 data_types: Vec<DataType>,
230
231 i2o_mapping: ColIndexMapping,
237
238 pub output_indices: Vec<usize>,
243
244 op_consistency_level: StateTableOpConsistencyLevel,
245
246 on_post_commit: bool,
249}
250
251pub type StateTable<S> = StateTableInner<S, BasicSerde>;
253pub type ReplicatedStateTable<S, SD> = StateTableInner<S, SD, true>;
256
257impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
259where
260 S: StateStore,
261 SD: ValueRowSerde,
262{
263 pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
266 self.row_store
267 .init(epoch, self.distribution.vnodes())
268 .await?;
269 assert_eq!(None, self.epoch.replace(epoch), "should not init for twice");
270 Ok(())
271 }
272
273 pub async fn try_wait_committed_epoch(&self, prev_epoch: u64) -> StorageResult<()> {
274 self.store
275 .try_wait_epoch(
276 HummockReadEpoch::Committed(prev_epoch),
277 TryWaitEpochOptions {
278 table_id: self.table_id,
279 },
280 )
281 .await
282 }
283
284 pub fn state_store(&self) -> &S {
285 &self.store
286 }
287}
288
289fn consistent_old_value_op(
290 row_serde: Arc<impl ValueRowSerde>,
291 is_log_store: bool,
292) -> OpConsistencyLevel {
293 OpConsistencyLevel::ConsistentOldValue {
294 check_old_value: Arc::new(move |first: &Bytes, second: &Bytes| {
295 if first == second {
296 return true;
297 }
298 let first = match row_serde.deserialize(first) {
299 Ok(rows) => rows,
300 Err(e) => {
301 error!(error = %e.as_report(), value = ?first, "fail to deserialize serialized value");
302 return false;
303 }
304 };
305 let second = match row_serde.deserialize(second) {
306 Ok(rows) => rows,
307 Err(e) => {
308 error!(error = %e.as_report(), value = ?second, "fail to deserialize serialized value");
309 return false;
310 }
311 };
312 if first != second {
313 error!(first = ?first, second = ?second, "sanity check fail");
314 false
315 } else {
316 true
317 }
318 }),
319 is_log_store,
320 }
321}
322
323macro_rules! dispatch_value_indices {
324 ($value_indices:expr, [$($row_var_name:ident),+], $body:expr) => {
325 if let Some(value_indices) = $value_indices {
326 $(
327 let $row_var_name = $row_var_name.project(value_indices);
328 )+
329 $body
330 } else {
331 $body
332 }
333 };
334}
335
336struct StateTableRowStore<LS: LocalStateStore, SD: ValueRowSerde> {
341 state_store: LS,
342 all_rows: Option<HashMap<VirtualNode, BTreeMap<Bytes, OwnedRow>>>,
343
344 table_id: TableId,
345 table_option: TableOption,
346 row_serde: Arc<SD>,
347 pk_serde: OrderedRowSerde,
349
350 vnode_stats: Option<HashMap<VirtualNode, VnodeStatistics>>,
352 pub metrics: Option<StateTableMetrics>,
354}
355
356impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
357 async fn may_load_vnode_stats(&mut self, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
358 if self.vnode_stats.is_none() {
359 return Ok(());
360 }
361
362 assert!(self.all_rows.is_none());
364
365 let start_time = Instant::now();
366 let mut stats_map = HashMap::new();
367
368 for vnode in vnode_bitmap.iter_vnodes() {
370 let mut stats = VnodeStatistics::new();
371
372 let memcomparable_range_with_vnode = prefixed_range_with_vnode::<Bytes>(.., vnode);
374 let read_options = ReadOptions {
375 retention_seconds: self.table_option.retention_seconds,
376 cache_policy: CachePolicy::Fill(Hint::Low),
377 ..Default::default()
378 };
379
380 let mut iter = self
381 .state_store
382 .iter(memcomparable_range_with_vnode.clone(), read_options.clone())
383 .await?;
384 if let Some(item) = iter.try_next().await? {
385 let (key_vnode, key_without_vnode) = item.0.user_key.table_key.split_vnode();
386 assert_eq!(vnode, key_vnode);
387 stats.min_key = Some(Bytes::copy_from_slice(key_without_vnode));
388 }
389
390 let mut rev_iter = self
392 .state_store
393 .rev_iter(memcomparable_range_with_vnode, read_options)
394 .await?;
395 if let Some(item) = rev_iter.try_next().await? {
396 let (key_vnode, key_without_vnode) = item.0.user_key.table_key.split_vnode();
397 assert_eq!(vnode, key_vnode);
398 stats.max_key = Some(Bytes::copy_from_slice(key_without_vnode));
399 }
400
401 stats_map.insert(vnode, stats);
402 }
403
404 self.vnode_stats = Some(stats_map);
405
406 if !cfg!(debug_assertions) {
408 info!(
409 table_id = %self.table_id,
410 vnode_count = vnode_bitmap.count_ones(),
411 duration = ?start_time.elapsed(),
412 "finished initializing vnode statistics"
413 );
414 }
415
416 Ok(())
417 }
418
419 async fn may_reload_all_rows(&mut self, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
420 if let Some(rows) = &mut self.all_rows {
421 rows.clear();
422 let start_time = Instant::now();
423 *rows = try_join_all(vnode_bitmap.iter_vnodes().map(|vnode| {
424 let state_store = &self.state_store;
425 let retention_seconds = self.table_option.retention_seconds;
426 let row_serde = &self.row_serde;
427 async move {
428 let mut rows = BTreeMap::new();
429 let memcomparable_range_with_vnode =
430 prefixed_range_with_vnode::<Bytes>(.., vnode);
431 let stream = deserialize_keyed_row_stream::<Bytes>(
433 state_store
434 .iter(
435 memcomparable_range_with_vnode,
436 ReadOptions {
437 prefix_hint: None,
438 prefetch_options: Default::default(),
439 cache_policy: Default::default(),
440 retention_seconds,
441 },
442 )
443 .await?,
444 &**row_serde,
445 );
446 pin_mut!(stream);
447 while let Some((encoded_key, row)) = stream.try_next().await? {
448 let key = TableKey(encoded_key);
449 let (iter_vnode, key) = key.split_vnode_bytes();
450 assert_eq!(vnode, iter_vnode);
451 rows.try_insert(key, row).expect("non-duplicated");
452 }
453 Ok((vnode, rows)) as StreamExecutorResult<_>
454 }
455 }))
456 .await?
457 .into_iter()
458 .collect();
459 if !cfg!(debug_assertions) {
461 info!(table_id = %self.table_id, vnode_count = vnode_bitmap.count_ones(), duration = ?start_time.elapsed(),"finished reloading all rows");
462 }
463 }
464 Ok(())
465 }
466
467 async fn init(&mut self, epoch: EpochPair, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
468 self.state_store.init(InitOptions::new(epoch)).await?;
469 self.may_reload_all_rows(vnode_bitmap).await?;
470 self.may_load_vnode_stats(vnode_bitmap).await
471 }
472
473 async fn update_vnode_bitmap(
474 &mut self,
475 vnodes: Arc<Bitmap>,
476 ) -> StreamExecutorResult<Arc<Bitmap>> {
477 let prev_vnodes = self.state_store.update_vnode_bitmap(vnodes.clone()).await?;
478 self.may_reload_all_rows(&vnodes).await?;
479 self.may_load_vnode_stats(&vnodes).await?;
480
481 Ok(prev_vnodes)
482 }
483
484 async fn try_flush(&mut self) -> StreamExecutorResult<()> {
485 self.state_store.try_flush().await?;
486 Ok(())
487 }
488
489 async fn seal_current_epoch(
490 &mut self,
491 next_epoch: u64,
492 table_watermarks: Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)>,
493 switch_consistent_op: Option<StateTableOpConsistencyLevel>,
494 ) -> StreamExecutorResult<()> {
495 if let Some((direction, watermarks, serde_type)) = &table_watermarks
496 && let Some(rows) = &mut self.all_rows
497 {
498 match serde_type {
499 WatermarkSerdeType::PkPrefix => {
500 for vnode_watermark in watermarks {
501 match direction {
502 WatermarkDirection::Ascending => {
503 for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
504 let rows = rows.get_mut(&vnode).expect("covered vnode");
505 *rows = rows.split_off(vnode_watermark.watermark());
507 }
508 }
509 WatermarkDirection::Descending => {
510 let split_off_key = next_key(vnode_watermark.watermark());
512 for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
513 let rows = rows.get_mut(&vnode).expect("covered vnode");
514 rows.split_off(split_off_key.as_slice());
517 }
518 }
519 }
520 }
521 }
522 WatermarkSerdeType::NonPkPrefix => {
523 warn!(table_id = %self.table_id, "table enabled preloading rows got disabled by written non pk prefix watermark");
524 self.all_rows = None;
525 }
526 }
527 }
528 self.state_store
529 .flush()
530 .instrument(tracing::info_span!("state_table_flush"))
531 .await?;
532 let switch_op_consistency_level =
533 switch_consistent_op.map(|new_consistency_level| match new_consistency_level {
534 StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
535 StateTableOpConsistencyLevel::ConsistentOldValue => {
536 consistent_old_value_op(self.row_serde.clone(), false)
537 }
538 StateTableOpConsistencyLevel::LogStoreEnabled => {
539 consistent_old_value_op(self.row_serde.clone(), true)
540 }
541 });
542 self.state_store.seal_current_epoch(
543 next_epoch,
544 SealCurrentEpochOptions {
545 table_watermarks,
546 switch_op_consistency_level,
547 },
548 );
549 Ok(())
550 }
551}
552
553#[derive(Eq, PartialEq, Copy, Clone, Debug)]
554pub enum StateTableOpConsistencyLevel {
555 Inconsistent,
557 ConsistentOldValue,
561 LogStoreEnabled,
564}
565
566pub struct StateTableBuilder<'a, S, SD, const IS_REPLICATED: bool, PreloadAllRow> {
567 table_catalog: &'a Table,
568 store: S,
569 vnodes: Option<Arc<Bitmap>>,
570 op_consistency_level: Option<StateTableOpConsistencyLevel>,
571 output_column_ids: Option<Vec<ColumnId>>,
572 preload_all_rows: PreloadAllRow,
573 enable_vnode_key_pruning: Option<bool>,
574 metrics: Option<StateTableMetrics>,
575
576 _serde: PhantomData<SD>,
577}
578
579impl<'a, S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool>
580 StateTableBuilder<'a, S, SD, IS_REPLICATED, ()>
581{
582 pub fn new(table_catalog: &'a Table, store: S, vnodes: Option<Arc<Bitmap>>) -> Self {
583 Self {
584 table_catalog,
585 store,
586 vnodes,
587 op_consistency_level: None,
588 output_column_ids: None,
589 preload_all_rows: (),
590 enable_vnode_key_pruning: None,
591 metrics: None,
592 _serde: Default::default(),
593 }
594 }
595
596 fn with_preload_all_rows(
597 self,
598 preload_all_rows: bool,
599 ) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, bool> {
600 StateTableBuilder {
601 table_catalog: self.table_catalog,
602 store: self.store,
603 vnodes: self.vnodes,
604 op_consistency_level: self.op_consistency_level,
605 output_column_ids: self.output_column_ids,
606 preload_all_rows,
607 enable_vnode_key_pruning: self.enable_vnode_key_pruning,
608 metrics: self.metrics,
609 _serde: Default::default(),
610 }
611 }
612
613 pub fn enable_preload_all_rows_by_config(
614 self,
615 config: &StreamingConfig,
616 ) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, bool> {
617 let developer = &config.developer;
618 let preload_all_rows = if developer.default_enable_mem_preload_state_table {
619 !developer
620 .mem_preload_state_table_ids_blacklist
621 .contains(&self.table_catalog.id.as_raw_id())
622 } else {
623 developer
624 .mem_preload_state_table_ids_whitelist
625 .contains(&self.table_catalog.id.as_raw_id())
626 };
627 self.with_preload_all_rows(preload_all_rows)
628 }
629
630 pub fn forbid_preload_all_rows(self) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, bool> {
631 self.with_preload_all_rows(false)
632 }
633}
634
635impl<'a, S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool, PreloadAllRow>
636 StateTableBuilder<'a, S, SD, IS_REPLICATED, PreloadAllRow>
637{
638 pub fn with_op_consistency_level(
639 mut self,
640 op_consistency_level: StateTableOpConsistencyLevel,
641 ) -> Self {
642 self.op_consistency_level = Some(op_consistency_level);
643 self
644 }
645
646 pub fn with_output_column_ids(mut self, output_column_ids: Vec<ColumnId>) -> Self {
647 self.output_column_ids = Some(output_column_ids);
648 self
649 }
650
651 pub fn enable_vnode_key_pruning(mut self, enable: bool) -> Self {
652 self.enable_vnode_key_pruning = Some(enable);
653 self
654 }
655
656 pub fn with_metrics(mut self, metrics: StateTableMetrics) -> Self {
657 self.metrics = Some(metrics);
658 self
659 }
660}
661
662impl<'a, S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool>
663 StateTableBuilder<'a, S, SD, IS_REPLICATED, bool>
664{
665 pub async fn build(self) -> StateTableInner<S, SD, IS_REPLICATED> {
666 let mut preload_all_rows = self.preload_all_rows;
667 if preload_all_rows
668 && let Err(e) =
669 risingwave_common::license::Feature::StateTableMemoryPreload.check_available()
670 {
671 warn!(table_id=%self.table_catalog.id, e=%e.as_report(), "table configured to preload rows to memory but disabled by license");
672 preload_all_rows = false;
673 }
674
675 let should_enable_vnode_key_pruning = if preload_all_rows
676 && let Some(enable_vnode_key_pruning) = self.enable_vnode_key_pruning
677 && enable_vnode_key_pruning
678 {
679 false
680 } else {
681 self.enable_vnode_key_pruning.unwrap_or(false)
682 };
683
684 StateTableInner::from_table_catalog_inner(
685 self.table_catalog,
686 self.store,
687 self.vnodes,
688 self.op_consistency_level
689 .unwrap_or(StateTableOpConsistencyLevel::ConsistentOldValue),
690 self.output_column_ids.unwrap_or_default(),
691 preload_all_rows,
692 should_enable_vnode_key_pruning,
693 self.metrics,
694 )
695 .await
696 }
697}
698
699impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
704where
705 S: StateStore,
706 SD: ValueRowSerde,
707{
708 #[cfg(any(test, feature = "test"))]
712 pub async fn from_table_catalog(
713 table_catalog: &Table,
714 store: S,
715 vnodes: Option<Arc<Bitmap>>,
716 ) -> Self {
717 StateTableBuilder::new(table_catalog, store, vnodes)
718 .forbid_preload_all_rows()
719 .build()
720 .await
721 }
722
723 pub async fn from_table_catalog_inconsistent_op(
725 table_catalog: &Table,
726 store: S,
727 vnodes: Option<Arc<Bitmap>>,
728 ) -> Self {
729 StateTableBuilder::new(table_catalog, store, vnodes)
730 .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
731 .forbid_preload_all_rows()
732 .build()
733 .await
734 }
735
736 #[allow(clippy::too_many_arguments)]
738 async fn from_table_catalog_inner(
739 table_catalog: &Table,
740 store: S,
741 vnodes: Option<Arc<Bitmap>>,
742 op_consistency_level: StateTableOpConsistencyLevel,
743 output_column_ids: Vec<ColumnId>,
744 preload_all_rows: bool,
745 enable_vnode_key_pruning: bool,
746 metrics: Option<StateTableMetrics>,
747 ) -> Self {
748 let table_id = table_catalog.id;
749 let table_columns: Vec<ColumnDesc> = table_catalog
750 .columns
751 .iter()
752 .map(|col| col.column_desc.as_ref().unwrap().into())
753 .collect();
754 let data_types: Vec<DataType> = table_catalog
755 .columns
756 .iter()
757 .map(|col| {
758 col.get_column_desc()
759 .unwrap()
760 .get_column_type()
761 .unwrap()
762 .into()
763 })
764 .collect();
765 let order_types: Vec<OrderType> = table_catalog
766 .pk
767 .iter()
768 .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
769 .collect();
770 let dist_key_indices: Vec<usize> = table_catalog
771 .distribution_key
772 .iter()
773 .map(|dist_index| *dist_index as usize)
774 .collect();
775
776 let pk_indices = table_catalog
777 .pk
778 .iter()
779 .map(|col_order| col_order.column_index as usize)
780 .collect_vec();
781
782 let dist_key_in_pk_indices = if table_catalog.get_dist_key_in_pk().is_empty() {
784 get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices).unwrap()
785 } else {
786 table_catalog
787 .get_dist_key_in_pk()
788 .iter()
789 .map(|idx| *idx as usize)
790 .collect()
791 };
792
793 let vnode_col_idx_in_pk = table_catalog.vnode_col_index.as_ref().and_then(|idx| {
794 let vnode_col_idx = *idx as usize;
795 pk_indices.iter().position(|&i| vnode_col_idx == i)
796 });
797
798 let distribution =
799 TableDistribution::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk);
800 assert_eq!(
801 distribution.vnode_count(),
802 table_catalog.vnode_count(),
803 "vnode count mismatch, scanning table {} under wrong distribution?",
804 table_catalog.name,
805 );
806
807 let pk_data_types = pk_indices
808 .iter()
809 .map(|i| table_columns[*i].data_type.clone())
810 .collect();
811 let pk_serde = OrderedRowSerde::new(pk_data_types, order_types);
812
813 let input_value_indices = table_catalog
814 .value_indices
815 .iter()
816 .map(|val| *val as usize)
817 .collect_vec();
818
819 let no_shuffle_value_indices = (0..table_columns.len()).collect_vec();
820
821 let value_indices = match input_value_indices.len() == table_columns.len()
823 && input_value_indices == no_shuffle_value_indices
824 {
825 true => None,
826 false => Some(input_value_indices),
827 };
828 let prefix_hint_len = table_catalog.read_prefix_len_hint as usize;
829
830 let row_serde = Arc::new(SD::new(
831 Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
832 Arc::from(table_columns.clone().into_boxed_slice()),
833 ));
834
835 let state_table_op_consistency_level = op_consistency_level;
836 let op_consistency_level = match op_consistency_level {
837 StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
838 StateTableOpConsistencyLevel::ConsistentOldValue => {
839 consistent_old_value_op(row_serde.clone(), false)
840 }
841 StateTableOpConsistencyLevel::LogStoreEnabled => {
842 consistent_old_value_op(row_serde.clone(), true)
843 }
844 };
845
846 let table_option = TableOption::new(table_catalog.retention_seconds);
847 let new_local_options = if IS_REPLICATED {
848 NewLocalOptions::new_replicated(
849 table_id,
850 op_consistency_level,
851 table_option,
852 distribution.vnodes().clone(),
853 )
854 } else {
855 NewLocalOptions::new(
856 table_id,
857 op_consistency_level,
858 table_option,
859 distribution.vnodes().clone(),
860 true,
861 )
862 };
863 let local_state_store = store.new_local(new_local_options).await;
864
865 assert_eq!(
871 table_catalog.version.is_some(),
872 row_serde.kind().is_column_aware()
873 );
874
875 let output_column_ids_to_input_idx = output_column_ids
877 .iter()
878 .enumerate()
879 .map(|(pos, id)| (*id, pos))
880 .collect::<HashMap<_, _>>();
881
882 let columns: Vec<ColumnDesc> = table_catalog
884 .columns
885 .iter()
886 .map(|c| c.column_desc.as_ref().unwrap().into())
887 .collect_vec();
888
889 let mut i2o_mapping = vec![None; columns.len()];
893 for (i, column) in columns.iter().enumerate() {
894 if let Some(pos) = output_column_ids_to_input_idx.get(&column.column_id) {
895 i2o_mapping[i] = Some(*pos);
896 }
897 }
898 let i2o_mapping = ColIndexMapping::new(i2o_mapping, output_column_ids.len());
900
901 let (_, output_indices) = find_columns_by_ids(&columns[..], &output_column_ids);
903
904 let clean_watermark_indices = table_catalog.get_clean_watermark_column_indices();
905 if clean_watermark_indices.len() > 1 {
906 unimplemented!("multiple clean watermark columns are not supported yet")
907 }
908 let clean_watermark_index = clean_watermark_indices.first().map(|&i| i as usize);
909
910 let watermark_serde = clean_watermark_index.map(|idx| {
911 let pk_idx = pk_indices.iter().position(|&i| i == idx);
912 let (watermark_serde, watermark_serde_type) = match pk_idx {
913 Some(0) => (pk_serde.index(0).into_owned(), WatermarkSerdeType::PkPrefix),
914 Some(pk_idx) => (
915 pk_serde.index(pk_idx).into_owned(),
916 WatermarkSerdeType::NonPkPrefix,
917 ),
918 None => (
919 OrderedRowSerde::new(
920 vec![data_types[idx].clone()],
921 vec![OrderType::ascending()],
922 ),
923 WatermarkSerdeType::NonPkPrefix,
925 ),
926 };
927 (watermark_serde, watermark_serde_type)
928 });
929
930 let max_watermark_of_vnodes = distribution
935 .vnodes()
936 .iter_vnodes()
937 .filter_map(|vnode| local_state_store.get_table_watermark(vnode))
938 .max();
939 let committed_watermark = if let Some((deser, WatermarkSerdeType::PkPrefix)) =
940 watermark_serde.as_ref()
941 && let Some(max_watermark) = max_watermark_of_vnodes
942 {
943 let deserialized = deser.deserialize(&max_watermark).ok().and_then(|row| {
944 assert!(row.len() == 1);
945 row[0].clone()
946 });
947 if deserialized.is_none() {
948 tracing::error!(
949 vnodes = ?distribution.vnodes(),
950 watermark = ?max_watermark,
951 "Failed to deserialize persisted watermark from state store.",
952 );
953 }
954 deserialized
955 } else {
956 None
957 };
958
959 Self {
960 table_id,
961 row_store: StateTableRowStore {
962 all_rows: preload_all_rows.then(HashMap::new),
963 table_option,
964 state_store: local_state_store,
965 row_serde,
966 pk_serde: pk_serde.clone(),
967 table_id,
968 vnode_stats: enable_vnode_key_pruning.then(HashMap::new),
970 metrics,
971 },
972 store,
973 epoch: None,
974 pk_serde,
975 pk_indices,
976 distribution,
977 prefix_hint_len,
978 value_indices,
979 pending_watermark: None,
980 committed_watermark,
981 watermark_serde,
982 data_types,
983 output_indices,
984 i2o_mapping,
985 op_consistency_level: state_table_op_consistency_level,
986 clean_watermark_index,
987 on_post_commit: false,
988 }
989 }
990
991 pub fn get_data_types(&self) -> &[DataType] {
992 &self.data_types
993 }
994
995 pub fn table_id(&self) -> TableId {
996 self.table_id
997 }
998
999 fn compute_prefix_vnode(&self, pk_prefix: &impl Row) -> VirtualNode {
1001 self.distribution
1002 .try_compute_vnode_by_pk_prefix(pk_prefix)
1003 .expect("For streaming, the given prefix must be enough to calculate the vnode")
1004 }
1005
1006 pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode {
1008 self.distribution.compute_vnode_by_pk(pk)
1009 }
1010
1011 pub fn pk_indices(&self) -> &[usize] {
1014 &self.pk_indices
1015 }
1016
1017 pub fn pk_in_output_indices(&self) -> Option<Vec<usize>> {
1021 assert!(IS_REPLICATED);
1022 self.pk_indices
1023 .iter()
1024 .map(|&i| self.output_indices.iter().position(|&j| i == j))
1025 .collect()
1026 }
1027
1028 pub fn pk_serde(&self) -> &OrderedRowSerde {
1029 &self.pk_serde
1030 }
1031
1032 pub fn vnodes(&self) -> &Arc<Bitmap> {
1033 self.distribution.vnodes()
1034 }
1035
1036 pub fn value_indices(&self) -> &Option<Vec<usize>> {
1037 &self.value_indices
1038 }
1039
1040 pub fn is_consistent_op(&self) -> bool {
1041 matches!(
1042 self.op_consistency_level,
1043 StateTableOpConsistencyLevel::ConsistentOldValue
1044 | StateTableOpConsistencyLevel::LogStoreEnabled
1045 )
1046 }
1047
1048 pub fn metrics(&self) -> Option<&StateTableMetrics> {
1049 self.row_store.metrics.as_ref()
1050 }
1051}
1052
1053impl<S, SD> StateTableInner<S, SD, true>
1054where
1055 S: StateStore,
1056 SD: ValueRowSerde,
1057{
1058 pub async fn new_replicated(
1060 table_catalog: &Table,
1061 store: S,
1062 vnodes: Option<Arc<Bitmap>>,
1063 output_column_ids: Vec<ColumnId>,
1064 ) -> Self {
1065 StateTableBuilder::new(table_catalog, store, vnodes)
1068 .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
1069 .with_output_column_ids(output_column_ids)
1070 .forbid_preload_all_rows()
1071 .build()
1072 .await
1073 }
1074}
1075
1076impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1078where
1079 S: StateStore,
1080 SD: ValueRowSerde,
1081{
1082 pub async fn get_row(&self, pk: impl Row) -> StreamExecutorResult<Option<OwnedRow>> {
1084 let (serialized_pk, prefix_hint) = self.serialize_pk_and_get_prefix_hint(&pk);
1085 let row = self.row_store.get(serialized_pk, prefix_hint).await?;
1086 match row {
1087 Some(row) => {
1088 if IS_REPLICATED {
1089 let row = row.project(&self.output_indices);
1092 Ok(Some(row.into_owned_row()))
1093 } else {
1094 Ok(Some(row))
1095 }
1096 }
1097 None => Ok(None),
1098 }
1099 }
1100
1101 pub async fn exists(&self, pk: impl Row) -> StreamExecutorResult<bool> {
1103 let (serialized_pk, prefix_hint) = self.serialize_pk_and_get_prefix_hint(&pk);
1104 self.row_store.exists(serialized_pk, prefix_hint).await
1105 }
1106
1107 fn serialize_pk(&self, pk: &impl Row) -> TableKey<Bytes> {
1108 assert!(pk.len() <= self.pk_indices.len());
1109 serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_vnode_by_pk(pk))
1110 }
1111
1112 fn serialize_pk_and_get_prefix_hint(&self, pk: &impl Row) -> (TableKey<Bytes>, Option<Bytes>) {
1113 let serialized_pk = self.serialize_pk(&pk);
1114 let prefix_hint = if self.prefix_hint_len != 0 && self.prefix_hint_len == pk.len() {
1115 Some(serialized_pk.slice(VirtualNode::SIZE..))
1116 } else {
1117 #[cfg(debug_assertions)]
1118 if self.prefix_hint_len != 0 {
1119 warn!(
1120 "prefix_hint_len is not equal to pk.len(), may not be able to utilize bloom filter"
1121 );
1122 }
1123 None
1124 };
1125 (serialized_pk, prefix_hint)
1126 }
1127}
1128
1129impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1130 async fn get(
1131 &self,
1132 key_bytes: TableKey<Bytes>,
1133 prefix_hint: Option<Bytes>,
1134 ) -> StreamExecutorResult<Option<OwnedRow>> {
1135 if let Some(m) = &self.metrics {
1136 m.get_count.inc();
1137 }
1138 if let Some(rows) = &self.all_rows {
1139 let (vnode, key) = key_bytes.split_vnode_bytes();
1140 return Ok(rows.get(&vnode).expect("covered vnode").get(&key).cloned());
1141 }
1142
1143 if let Some(stats) = &self.vnode_stats
1145 && let (vnode, key) = key_bytes.split_vnode_bytes()
1146 && let Some(vnode_stat) = stats.get(&vnode)
1147 && vnode_stat.can_prune(&key)
1148 {
1149 if let Some(m) = &self.metrics {
1150 m.get_vnode_pruned_count.inc();
1151 }
1152 return Ok(None);
1153 }
1154
1155 let read_options = ReadOptions {
1156 prefix_hint,
1157 retention_seconds: self.table_option.retention_seconds,
1158 cache_policy: CachePolicy::Fill(Hint::Normal),
1159 ..Default::default()
1160 };
1161
1162 self.state_store
1163 .on_key_value(key_bytes, read_options, move |_, value| {
1164 let row = self.row_serde.deserialize(value)?;
1165 Ok(OwnedRow::new(row))
1166 })
1167 .await
1168 .map_err(Into::into)
1169 }
1170
1171 async fn exists(
1172 &self,
1173 key_bytes: TableKey<Bytes>,
1174 prefix_hint: Option<Bytes>,
1175 ) -> StreamExecutorResult<bool> {
1176 if let Some(m) = &self.metrics {
1177 m.get_count.inc();
1178 }
1179 if let Some(rows) = &self.all_rows {
1180 let (vnode, key) = key_bytes.split_vnode_bytes();
1181 return Ok(rows.get(&vnode).expect("covered vnode").contains_key(&key));
1182 }
1183
1184 if let Some(stats) = &self.vnode_stats
1186 && let (vnode, key) = key_bytes.split_vnode_bytes()
1187 && let Some(vnode_stat) = stats.get(&vnode)
1188 && vnode_stat.can_prune(&key)
1189 {
1190 if let Some(m) = &self.metrics {
1191 m.get_vnode_pruned_count.inc();
1192 }
1193 return Ok(false);
1194 }
1195
1196 let read_options = ReadOptions {
1197 prefix_hint,
1198 retention_seconds: self.table_option.retention_seconds,
1199 cache_policy: CachePolicy::Fill(Hint::Normal),
1200 ..Default::default()
1201 };
1202 let result = self
1203 .state_store
1204 .on_key_value(key_bytes, read_options, move |_, _| Ok(()))
1205 .await?;
1206 Ok(result.is_some())
1207 }
1208}
1209
1210#[must_use]
1225pub struct StateTablePostCommit<'a, S, SD = BasicSerde, const IS_REPLICATED: bool = false>
1226where
1227 S: StateStore,
1228 SD: ValueRowSerde,
1229{
1230 inner: &'a mut StateTableInner<S, SD, IS_REPLICATED>,
1231}
1232
1233impl<'a, S, SD, const IS_REPLICATED: bool> StateTablePostCommit<'a, S, SD, IS_REPLICATED>
1234where
1235 S: StateStore,
1236 SD: ValueRowSerde,
1237{
1238 pub async fn post_yield_barrier(
1239 mut self,
1240 new_vnodes: Option<Arc<Bitmap>>,
1241 ) -> StreamExecutorResult<
1242 Option<(
1243 (
1244 Arc<Bitmap>,
1245 Arc<Bitmap>,
1246 &'a mut StateTableInner<S, SD, IS_REPLICATED>,
1247 ),
1248 bool,
1249 )>,
1250 > {
1251 self.inner.on_post_commit = false;
1252 Ok(if let Some(new_vnodes) = new_vnodes {
1253 let (old_vnodes, cache_may_stale) =
1254 self.update_vnode_bitmap(new_vnodes.clone()).await?;
1255 Some(((new_vnodes, old_vnodes, self.inner), cache_may_stale))
1256 } else {
1257 None
1258 })
1259 }
1260
1261 pub fn inner(&self) -> &StateTableInner<S, SD, IS_REPLICATED> {
1262 &*self.inner
1263 }
1264
1265 async fn update_vnode_bitmap(
1267 &mut self,
1268 new_vnodes: Arc<Bitmap>,
1269 ) -> StreamExecutorResult<(Arc<Bitmap>, bool)> {
1270 let prev_vnodes = self
1271 .inner
1272 .row_store
1273 .update_vnode_bitmap(new_vnodes.clone())
1274 .await?;
1275 assert_eq!(
1276 &prev_vnodes,
1277 self.inner.vnodes(),
1278 "state table and state store vnode bitmap mismatches"
1279 );
1280
1281 if self.inner.distribution.is_singleton() {
1282 assert_eq!(
1283 &new_vnodes,
1284 self.inner.vnodes(),
1285 "should not update vnode bitmap for singleton table"
1286 );
1287 }
1288 assert_eq!(self.inner.vnodes().len(), new_vnodes.len());
1289
1290 let cache_may_stale = cache_may_stale(self.inner.vnodes(), &new_vnodes);
1291
1292 if cache_may_stale {
1293 self.inner.pending_watermark = None;
1294 }
1295
1296 Ok((
1297 self.inner.distribution.update_vnode_bitmap(new_vnodes),
1298 cache_may_stale,
1299 ))
1300 }
1301}
1302
1303impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1305 fn handle_mem_table_error(&self, e: StorageError) {
1306 let e = match e.into_inner() {
1307 ErrorKind::MemTable(e) => e,
1308 _ => unreachable!("should only get memtable error"),
1309 };
1310 match *e {
1311 MemTableError::InconsistentOperation { key, prev, new, .. } => {
1312 let (vnode, key) = deserialize_pk_with_vnode(&key, &self.pk_serde).unwrap();
1313 panic!(
1314 "mem-table operation inconsistent! table_id: {}, vnode: {}, key: {:?}, prev: {}, new: {}",
1315 self.table_id,
1316 vnode,
1317 &key,
1318 prev.debug_fmt(&*self.row_serde),
1319 new.debug_fmt(&*self.row_serde),
1320 )
1321 }
1322 }
1323 }
1324
1325 fn insert(&mut self, key: TableKey<Bytes>, value: impl Row) {
1326 insane_mode_discard_point!();
1327 let value_bytes = self.row_serde.serialize(&value).into();
1328
1329 let (vnode, key_without_vnode) = key.split_vnode_bytes();
1330
1331 if self.all_rows.is_none()
1333 && let Some(stats) = &mut self.vnode_stats
1334 && let Some(vnode_stat) = stats.get_mut(&vnode)
1335 {
1336 vnode_stat.update_with_key(&key_without_vnode);
1337 }
1338
1339 if let Some(rows) = &mut self.all_rows {
1340 rows.get_mut(&vnode)
1341 .expect("covered vnode")
1342 .insert(key_without_vnode, value.into_owned_row());
1343 }
1344 self.state_store
1345 .insert(key, value_bytes, None)
1346 .unwrap_or_else(|e| self.handle_mem_table_error(e));
1347 }
1348
1349 fn delete(&mut self, key: TableKey<Bytes>, value: impl Row) {
1350 insane_mode_discard_point!();
1351 let value_bytes = self.row_serde.serialize(value).into();
1352
1353 let (vnode, key_without_vnode) = key.split_vnode_bytes();
1354
1355 if self.all_rows.is_none()
1358 && let Some(stats) = &mut self.vnode_stats
1359 && let Some(vnode_stat) = stats.get_mut(&vnode)
1360 {
1361 vnode_stat.update_with_key(&key_without_vnode);
1362 }
1363
1364 if let Some(rows) = &mut self.all_rows {
1365 rows.get_mut(&vnode)
1366 .expect("covered vnode")
1367 .remove(&key_without_vnode);
1368 }
1369 self.state_store
1370 .delete(key, value_bytes)
1371 .unwrap_or_else(|e| self.handle_mem_table_error(e));
1372 }
1373
1374 fn update(&mut self, key_bytes: TableKey<Bytes>, old_value: impl Row, new_value: impl Row) {
1375 insane_mode_discard_point!();
1376 let new_value_bytes = self.row_serde.serialize(&new_value).into();
1377 let old_value_bytes = self.row_serde.serialize(old_value).into();
1378
1379 let (vnode, key_without_vnode) = key_bytes.split_vnode_bytes();
1380
1381 if self.all_rows.is_none()
1384 && let Some(stats) = &mut self.vnode_stats
1385 && let Some(vnode_stat) = stats.get_mut(&vnode)
1386 {
1387 vnode_stat.update_with_key(&key_without_vnode);
1388 }
1389
1390 if let Some(rows) = &mut self.all_rows {
1391 rows.get_mut(&vnode)
1392 .expect("covered vnode")
1393 .insert(key_without_vnode, new_value.into_owned_row());
1394 }
1395 self.state_store
1396 .insert(key_bytes, new_value_bytes, Some(old_value_bytes))
1397 .unwrap_or_else(|e| self.handle_mem_table_error(e));
1398 }
1399}
1400
1401impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1402where
1403 S: StateStore,
1404 SD: ValueRowSerde,
1405{
1406 pub fn insert(&mut self, value: impl Row) {
1409 let pk_indices = &self.pk_indices;
1410 let pk = (&value).project(pk_indices);
1411
1412 let key_bytes = self.serialize_pk(&pk);
1413 dispatch_value_indices!(&self.value_indices, [value], {
1414 self.row_store.insert(key_bytes, value)
1415 })
1416 }
1417
1418 pub fn delete(&mut self, old_value: impl Row) {
1421 let pk_indices = &self.pk_indices;
1422 let pk = (&old_value).project(pk_indices);
1423
1424 let key_bytes = self.serialize_pk(&pk);
1425 dispatch_value_indices!(&self.value_indices, [old_value], {
1426 self.row_store.delete(key_bytes, old_value)
1427 })
1428 }
1429
1430 pub fn update(&mut self, old_value: impl Row, new_value: impl Row) {
1432 let old_pk = (&old_value).project(self.pk_indices());
1433 let new_pk = (&new_value).project(self.pk_indices());
1434 debug_assert!(
1435 Row::eq(&old_pk, new_pk),
1436 "pk should not change: {old_pk:?} vs {new_pk:?}. {}",
1437 self.table_id
1438 );
1439
1440 let key_bytes = self.serialize_pk(&new_pk);
1441 dispatch_value_indices!(&self.value_indices, [old_value, new_value], {
1442 self.row_store.update(key_bytes, old_value, new_value)
1443 })
1444 }
1445
1446 pub fn write_record(&mut self, record: Record<impl Row>) {
1448 match record {
1449 Record::Insert { new_row } => self.insert(new_row),
1450 Record::Delete { old_row } => self.delete(old_row),
1451 Record::Update { old_row, new_row } => self.update(old_row, new_row),
1452 }
1453 }
1454
1455 fn fill_non_output_indices(&self, chunk: StreamChunk) -> StreamChunk {
1456 fill_non_output_indices(&self.i2o_mapping, &self.data_types, chunk)
1457 }
1458
1459 #[allow(clippy::disallowed_methods)]
1462 pub fn write_chunk(&mut self, chunk: StreamChunk) {
1463 let chunk = if IS_REPLICATED {
1464 self.fill_non_output_indices(chunk)
1465 } else {
1466 chunk
1467 };
1468
1469 let vnodes = self
1470 .distribution
1471 .compute_chunk_vnode(&chunk, &self.pk_indices);
1472
1473 for (idx, optional_row) in chunk.rows_with_holes().enumerate() {
1474 let Some((op, row)) = optional_row else {
1475 continue;
1476 };
1477 let pk = row.project(&self.pk_indices);
1478 let vnode = vnodes[idx];
1479 let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, vnode);
1480 match op {
1481 Op::Insert | Op::UpdateInsert => {
1482 dispatch_value_indices!(&self.value_indices, [row], {
1483 self.row_store.insert(key_bytes, row);
1484 });
1485 }
1486 Op::Delete | Op::UpdateDelete => {
1487 dispatch_value_indices!(&self.value_indices, [row], {
1488 self.row_store.delete(key_bytes, row);
1489 });
1490 }
1491 }
1492 }
1493 }
1494
1495 pub fn update_watermark(&mut self, watermark: ScalarImpl) {
1501 trace!(table_id = %self.table_id, watermark = ?watermark, "update watermark");
1502 self.pending_watermark = Some(watermark);
1503 }
1504
1505 pub fn get_committed_watermark(&self) -> Option<&ScalarImpl> {
1508 self.committed_watermark.as_ref()
1509 }
1510
1511 pub async fn commit(
1512 &mut self,
1513 new_epoch: EpochPair,
1514 ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED>> {
1515 self.commit_inner(new_epoch, None).await
1516 }
1517
1518 #[cfg(test)]
1519 pub async fn commit_for_test(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
1520 self.commit_assert_no_update_vnode_bitmap(new_epoch).await
1521 }
1522
1523 pub async fn commit_assert_no_update_vnode_bitmap(
1524 &mut self,
1525 new_epoch: EpochPair,
1526 ) -> StreamExecutorResult<()> {
1527 let post_commit = self.commit_inner(new_epoch, None).await?;
1528 post_commit.post_yield_barrier(None).await?;
1529 Ok(())
1530 }
1531
1532 pub async fn commit_may_switch_consistent_op(
1533 &mut self,
1534 new_epoch: EpochPair,
1535 op_consistency_level: StateTableOpConsistencyLevel,
1536 ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED>> {
1537 if self.op_consistency_level != op_consistency_level {
1538 if !cfg!(debug_assertions) {
1540 info!(
1541 ?new_epoch,
1542 prev_op_consistency_level = ?self.op_consistency_level,
1543 ?op_consistency_level,
1544 table_id = %self.table_id,
1545 "switch to new op consistency level"
1546 );
1547 }
1548 self.commit_inner(new_epoch, Some(op_consistency_level))
1549 .await
1550 } else {
1551 self.commit_inner(new_epoch, None).await
1552 }
1553 }
1554
1555 async fn commit_inner(
1556 &mut self,
1557 new_epoch: EpochPair,
1558 switch_consistent_op: Option<StateTableOpConsistencyLevel>,
1559 ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED>> {
1560 assert!(!self.on_post_commit);
1561 assert_eq!(
1562 self.epoch.expect("should only be called after init").curr,
1563 new_epoch.prev
1564 );
1565 if let Some(new_consistency_level) = switch_consistent_op {
1566 assert_ne!(self.op_consistency_level, new_consistency_level);
1567 self.op_consistency_level = new_consistency_level;
1568 }
1569 trace!(
1570 table_id = %self.table_id,
1571 epoch = ?self.epoch,
1572 "commit state table"
1573 );
1574
1575 let table_watermarks = self.commit_pending_watermark();
1576 self.row_store
1577 .seal_current_epoch(new_epoch.curr, table_watermarks, switch_consistent_op)
1578 .await?;
1579 self.epoch = Some(new_epoch);
1580
1581 self.on_post_commit = true;
1582 Ok(StateTablePostCommit { inner: self })
1583 }
1584
1585 fn commit_pending_watermark(
1587 &mut self,
1588 ) -> Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)> {
1589 let watermark = self.pending_watermark.take()?;
1590 trace!(table_id = %self.table_id, watermark = ?watermark, "state cleaning");
1591
1592 assert!(
1593 !self.pk_indices().is_empty(),
1594 "see pending watermark on empty pk"
1595 );
1596 let (watermark_serializer, watermark_type) = self
1597 .watermark_serde
1598 .as_ref()
1599 .expect("watermark serde should be initialized to commit watermark");
1600
1601 let watermark_suffix =
1602 serialize_pk(row::once(Some(watermark.clone())), watermark_serializer);
1603 let vnode_watermark = VnodeWatermark::new(
1604 self.vnodes().clone(),
1605 Bytes::copy_from_slice(watermark_suffix.as_ref()),
1606 );
1607
1608 trace!(table_id = %self.table_id, ?vnode_watermark, "table watermark");
1609
1610 let order_type = watermark_serializer.get_order_types().get(0).unwrap();
1611 let direction = if order_type.is_ascending() {
1612 WatermarkDirection::Ascending
1613 } else {
1614 WatermarkDirection::Descending
1615 };
1616
1617 self.committed_watermark = Some(watermark);
1618 Some((direction, vec![vnode_watermark], *watermark_type))
1619 }
1620
1621 pub async fn try_flush(&mut self) -> StreamExecutorResult<()> {
1622 self.row_store.try_flush().await?;
1623 Ok(())
1624 }
1625}
1626
1627pub trait RowStream<'a>: Stream<Item = StreamExecutorResult<OwnedRow>> + 'a {}
1629impl<'a, S: Stream<Item = StreamExecutorResult<OwnedRow>> + 'a> RowStream<'a> for S {}
1630
1631pub trait KeyedRowStream<'a>: Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a {}
1632impl<'a, S: Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a> KeyedRowStream<'a> for S {}
1633
1634pub trait PkRowStream<'a, K>: Stream<Item = StreamExecutorResult<(K, OwnedRow)>> + 'a {}
1635impl<'a, K, S: Stream<Item = StreamExecutorResult<(K, OwnedRow)>> + 'a> PkRowStream<'a, K> for S {}
1636
1637pub trait FromVnodeBytes {
1638 fn from_vnode_bytes(vnode: VirtualNode, bytes: &Bytes) -> Self;
1639}
1640
1641impl FromVnodeBytes for Bytes {
1642 fn from_vnode_bytes(vnode: VirtualNode, bytes: &Bytes) -> Self {
1643 prefix_slice_with_vnode(vnode, bytes)
1644 }
1645}
1646
1647impl FromVnodeBytes for () {
1648 fn from_vnode_bytes(_vnode: VirtualNode, _bytes: &Bytes) -> Self {}
1649}
1650
1651impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1653where
1654 S: StateStore,
1655 SD: ValueRowSerde,
1656{
1657 pub async fn iter_with_vnode(
1660 &self,
1661
1662 vnode: VirtualNode,
1666 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1667 prefetch_options: PrefetchOptions,
1668 ) -> StreamExecutorResult<impl RowStream<'_>> {
1669 Ok(self
1670 .iter_kv_with_pk_range::<()>(pk_range, vnode, prefetch_options)
1671 .await?
1672 .map_ok(|(_, row)| row))
1673 }
1674
1675 pub async fn iter_keyed_row_with_vnode(
1676 &self,
1677 vnode: VirtualNode,
1678 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1679 prefetch_options: PrefetchOptions,
1680 ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1681 Ok(self
1682 .iter_kv_with_pk_range(pk_range, vnode, prefetch_options)
1683 .await?
1684 .map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)))
1685 }
1686
1687 pub async fn iter_with_vnode_and_output_indices(
1688 &self,
1689 vnode: VirtualNode,
1690 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1691 prefetch_options: PrefetchOptions,
1692 ) -> StreamExecutorResult<impl RowStream<'_>> {
1693 assert!(IS_REPLICATED);
1694 let stream = self
1695 .iter_with_vnode(vnode, pk_range, prefetch_options)
1696 .await?;
1697 Ok(stream.map(|row| row.map(|row| row.project(&self.output_indices).into_owned_row())))
1698 }
1699}
1700
1701impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1702 async fn iter_kv<K: CopyFromSlice + FromVnodeBytes>(
1707 &self,
1708 vnode: VirtualNode,
1709 (start, end): (Bound<Bytes>, Bound<Bytes>),
1710 prefix_hint: Option<Bytes>,
1711 prefetch_options: PrefetchOptions,
1712 ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1713 if let Some(m) = &self.metrics {
1714 m.iter_count.inc();
1715 }
1716 let (pruned_start, pruned_end) = if let Some(stats) = &self.vnode_stats
1718 && let Some(vnode_stat) = stats.get(&vnode)
1719 {
1720 match vnode_stat.pruned_key_range(&start, &end) {
1721 Some((new_start, new_end)) => (new_start, new_end),
1722 None => {
1723 if let Some(m) = &self.metrics {
1724 m.iter_vnode_pruned_count.inc();
1725 }
1726 return Ok(futures::future::Either::Left(futures::stream::empty()));
1727 }
1728 }
1729 } else {
1730 (start, end)
1731 };
1732
1733 if let Some(rows) = &self.all_rows {
1734 return Ok(futures::future::Either::Right(
1735 futures::future::Either::Left(futures::stream::iter(
1736 rows.get(&vnode)
1737 .expect("covered vnode")
1738 .range((pruned_start, pruned_end))
1739 .map(move |(key, value)| {
1740 Ok((K::from_vnode_bytes(vnode, key), value.clone()))
1741 }),
1742 )),
1743 ));
1744 }
1745 let read_options = ReadOptions {
1746 prefix_hint,
1747 retention_seconds: self.table_option.retention_seconds,
1748 prefetch_options,
1749 cache_policy: CachePolicy::Fill(Hint::Normal),
1750 };
1751
1752 Ok(futures::future::Either::Right(
1753 futures::future::Either::Right(deserialize_keyed_row_stream(
1754 self.state_store
1755 .iter(
1756 prefixed_range_with_vnode((pruned_start, pruned_end), vnode),
1757 read_options,
1758 )
1759 .await?,
1760 &*self.row_serde,
1761 )),
1762 ))
1763 }
1764
1765 async fn rev_iter_kv<K: CopyFromSlice + FromVnodeBytes>(
1766 &self,
1767 vnode: VirtualNode,
1768 (start, end): (Bound<Bytes>, Bound<Bytes>),
1769 prefix_hint: Option<Bytes>,
1770 prefetch_options: PrefetchOptions,
1771 ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1772 if let Some(m) = &self.metrics {
1773 m.iter_count.inc();
1774 }
1775 let (pruned_start, pruned_end) = if let Some(stats) = &self.vnode_stats
1777 && let Some(vnode_stat) = stats.get(&vnode)
1778 {
1779 match vnode_stat.pruned_key_range(&start, &end) {
1780 Some((new_start, new_end)) => (new_start, new_end),
1781 None => {
1782 if let Some(m) = &self.metrics {
1783 m.iter_vnode_pruned_count.inc();
1784 }
1785 return Ok(futures::future::Either::Left(futures::stream::empty()));
1786 }
1787 }
1788 } else {
1789 (start, end)
1790 };
1791
1792 if let Some(rows) = &self.all_rows {
1793 return Ok(futures::future::Either::Right(
1794 futures::future::Either::Left(futures::stream::iter(
1795 rows.get(&vnode)
1796 .expect("covered vnode")
1797 .range((pruned_start, pruned_end))
1798 .rev()
1799 .map(move |(key, value)| {
1800 Ok((K::from_vnode_bytes(vnode, key), value.clone()))
1801 }),
1802 )),
1803 ));
1804 }
1805 let read_options = ReadOptions {
1806 prefix_hint,
1807 retention_seconds: self.table_option.retention_seconds,
1808 prefetch_options,
1809 cache_policy: CachePolicy::Fill(Hint::Normal),
1810 };
1811
1812 Ok(futures::future::Either::Right(
1813 futures::future::Either::Right(deserialize_keyed_row_stream(
1814 self.state_store
1815 .rev_iter(
1816 prefixed_range_with_vnode((pruned_start, pruned_end), vnode),
1817 read_options,
1818 )
1819 .await?,
1820 &*self.row_serde,
1821 )),
1822 ))
1823 }
1824}
1825
1826impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1827where
1828 S: StateStore,
1829 SD: ValueRowSerde,
1830{
1831 pub async fn iter_with_prefix(
1835 &self,
1836 pk_prefix: impl Row,
1837 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1838 prefetch_options: PrefetchOptions,
1839 ) -> StreamExecutorResult<impl RowStream<'_>> {
1840 let stream = self.iter_with_prefix_inner::<false, ()>(pk_prefix, sub_range, prefetch_options)
1841 .await?;
1842 Ok(stream.map_ok(|(_, row)| row))
1843 }
1844
1845 pub async fn get_from_one_row_table(&self) -> StreamExecutorResult<Option<OwnedRow>> {
1847 let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
1848 let stream = self
1849 .iter_with_prefix(row::empty(), sub_range, Default::default())
1850 .await?;
1851 pin_mut!(stream);
1852
1853 if let Some(res) = stream.next().await {
1854 let value = res?.into_owned_row();
1855 assert!(stream.next().await.is_none());
1856 Ok(Some(value))
1857 } else {
1858 Ok(None)
1859 }
1860 }
1861
1862 pub async fn get_from_one_value_table(&self) -> StreamExecutorResult<Option<ScalarImpl>> {
1867 Ok(self
1868 .get_from_one_row_table()
1869 .await?
1870 .and_then(|row| row[0].clone()))
1871 }
1872
1873 pub async fn iter_keyed_row_with_prefix(
1874 &self,
1875 pk_prefix: impl Row,
1876 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1877 prefetch_options: PrefetchOptions,
1878 ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1879 Ok(
1880 self.iter_with_prefix_inner::<false, Bytes>(pk_prefix, sub_range, prefetch_options)
1881 .await?.map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)),
1882 )
1883 }
1884
1885 pub async fn rev_iter_with_prefix(
1887 &self,
1888 pk_prefix: impl Row,
1889 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1890 prefetch_options: PrefetchOptions,
1891 ) -> StreamExecutorResult<impl RowStream<'_>> {
1892 Ok(
1893 self.iter_with_prefix_inner::<true, ()>(pk_prefix, sub_range, prefetch_options)
1894 .await?.map_ok(|(_, row)| row),
1895 )
1896 }
1897
1898 async fn iter_with_prefix_inner<const REVERSE: bool, K: CopyFromSlice + FromVnodeBytes>(
1899 &self,
1900 pk_prefix: impl Row,
1901 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1902 prefetch_options: PrefetchOptions,
1903 ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1904 let prefix_serializer = self.pk_serde.prefix(pk_prefix.len());
1905 let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
1906
1907 let vnode = self.compute_prefix_vnode(&pk_prefix);
1911
1912 let pk_prefix_indices = &self.pk_indices[..pk_prefix.len()];
1914 if self.prefix_hint_len != 0 {
1915 debug_assert_eq!(self.prefix_hint_len, pk_prefix.len());
1916 }
1917 let prefix_hint = {
1918 if self.prefix_hint_len == 0 || self.prefix_hint_len > pk_prefix.len() {
1919 None
1920 } else {
1921 let encoded_prefix_len = self
1922 .pk_serde
1923 .deserialize_prefix_len(&encoded_prefix, self.prefix_hint_len)?;
1924
1925 Some(Bytes::copy_from_slice(
1926 &encoded_prefix[..encoded_prefix_len],
1927 ))
1928 }
1929 };
1930
1931 trace!(
1932 table_id = %self.table_id(),
1933 ?prefix_hint, ?pk_prefix,
1934 ?pk_prefix_indices,
1935 iter_direction = if REVERSE { "reverse" } else { "forward" },
1936 "storage_iter_with_prefix"
1937 );
1938
1939 let memcomparable_range =
1940 prefix_and_sub_range_to_memcomparable(&self.pk_serde, sub_range, pk_prefix);
1941
1942 Ok(if REVERSE {
1943 futures::future::Either::Left(
1944 self.row_store
1945 .rev_iter_kv(vnode, memcomparable_range, prefix_hint, prefetch_options)
1946 .await?,
1947 )
1948 } else {
1949 futures::future::Either::Right(
1950 self.row_store
1951 .iter_kv(vnode, memcomparable_range, prefix_hint, prefetch_options)
1952 .await?,
1953 )
1954 })
1955 }
1956
1957 async fn iter_kv_with_pk_range<'a, K: CopyFromSlice + FromVnodeBytes>(
1960 &'a self,
1961 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1962 vnode: VirtualNode,
1966 prefetch_options: PrefetchOptions,
1967 ) -> StreamExecutorResult<impl PkRowStream<'a, K>> {
1968 let memcomparable_range = prefix_range_to_memcomparable(&self.pk_serde, pk_range);
1969
1970 self.row_store
1972 .iter_kv(vnode, memcomparable_range, None, prefetch_options)
1973 .await
1974 }
1975}
1976
1977fn deserialize_keyed_row_stream<'a, K: CopyFromSlice>(
1978 iter: impl StateStoreIter + 'a,
1979 deserializer: &'a impl ValueRowSerde,
1980) -> impl PkRowStream<'a, K> {
1981 iter.into_stream(move |(key, value)| {
1982 Ok((
1983 K::copy_from_slice(key.user_key.table_key.as_ref()),
1984 deserializer.deserialize(value).map(OwnedRow::new)?,
1985 ))
1986 })
1987 .map_err(Into::into)
1988}
1989
1990pub fn prefix_range_to_memcomparable(
1991 pk_serde: &OrderedRowSerde,
1992 range: &(Bound<impl Row>, Bound<impl Row>),
1993) -> (Bound<Bytes>, Bound<Bytes>) {
1994 (
1995 start_range_to_memcomparable(pk_serde, &range.0),
1996 end_range_to_memcomparable(pk_serde, &range.1, None),
1997 )
1998}
1999
2000fn prefix_and_sub_range_to_memcomparable(
2001 pk_serde: &OrderedRowSerde,
2002 sub_range: &(Bound<impl Row>, Bound<impl Row>),
2003 pk_prefix: impl Row,
2004) -> (Bound<Bytes>, Bound<Bytes>) {
2005 let (range_start, range_end) = sub_range;
2006 let prefix_serializer = pk_serde.prefix(pk_prefix.len());
2007 let serialized_pk_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
2008 let start_range = match range_start {
2009 Included(start_range) => Bound::Included(Either::Left((&pk_prefix).chain(start_range))),
2010 Excluded(start_range) => Bound::Excluded(Either::Left((&pk_prefix).chain(start_range))),
2011 Unbounded => Bound::Included(Either::Right(&pk_prefix)),
2012 };
2013 let end_range = match range_end {
2014 Included(end_range) => Bound::Included((&pk_prefix).chain(end_range)),
2015 Excluded(end_range) => Bound::Excluded((&pk_prefix).chain(end_range)),
2016 Unbounded => Unbounded,
2017 };
2018 (
2019 start_range_to_memcomparable(pk_serde, &start_range),
2020 end_range_to_memcomparable(pk_serde, &end_range, Some(serialized_pk_prefix)),
2021 )
2022}
2023
2024fn start_range_to_memcomparable<R: Row>(
2025 pk_serde: &OrderedRowSerde,
2026 bound: &Bound<R>,
2027) -> Bound<Bytes> {
2028 let serialize_pk_prefix = |pk_prefix: &R| {
2029 let prefix_serializer = pk_serde.prefix(pk_prefix.len());
2030 serialize_pk(pk_prefix, &prefix_serializer)
2031 };
2032 match bound {
2033 Unbounded => Unbounded,
2034 Included(r) => {
2035 let serialized = serialize_pk_prefix(r);
2036
2037 Included(serialized)
2038 }
2039 Excluded(r) => {
2040 let serialized = serialize_pk_prefix(r);
2041
2042 start_bound_of_excluded_prefix(&serialized)
2043 }
2044 }
2045}
2046
2047fn end_range_to_memcomparable<R: Row>(
2048 pk_serde: &OrderedRowSerde,
2049 bound: &Bound<R>,
2050 serialized_pk_prefix: Option<Bytes>,
2051) -> Bound<Bytes> {
2052 let serialize_pk_prefix = |pk_prefix: &R| {
2053 let prefix_serializer = pk_serde.prefix(pk_prefix.len());
2054 serialize_pk(pk_prefix, &prefix_serializer)
2055 };
2056 match bound {
2057 Unbounded => match serialized_pk_prefix {
2058 Some(serialized_pk_prefix) => end_bound_of_prefix(&serialized_pk_prefix),
2059 None => Unbounded,
2060 },
2061 Included(r) => {
2062 let serialized = serialize_pk_prefix(r);
2063
2064 end_bound_of_prefix(&serialized)
2065 }
2066 Excluded(r) => {
2067 let serialized = serialize_pk_prefix(r);
2068 Excluded(serialized)
2069 }
2070 }
2071}
2072
2073fn fill_non_output_indices(
2074 i2o_mapping: &ColIndexMapping,
2075 data_types: &[DataType],
2076 chunk: StreamChunk,
2077) -> StreamChunk {
2078 let cardinality = chunk.cardinality();
2079 let (ops, columns, vis) = chunk.into_inner();
2080 let mut full_columns = Vec::with_capacity(data_types.len());
2081 for (i, data_type) in data_types.iter().enumerate() {
2082 if let Some(j) = i2o_mapping.try_map(i) {
2083 full_columns.push(columns[j].clone());
2084 } else {
2085 let mut column_builder = ArrayImplBuilder::with_type(cardinality, data_type.clone());
2086 column_builder.append_n_null(cardinality);
2087 let column: ArrayRef = column_builder.finish().into();
2088 full_columns.push(column)
2089 }
2090 }
2091 let data_chunk = DataChunk::new(full_columns, vis);
2092 StreamChunk::from_parts(ops, data_chunk)
2093}
2094
2095#[cfg(test)]
2096mod tests {
2097 use std::fmt::Debug;
2098
2099 use expect_test::{Expect, expect};
2100
2101 use super::*;
2102
2103 fn check(actual: impl Debug, expect: Expect) {
2104 let actual = format!("{:#?}", actual);
2105 expect.assert_eq(&actual);
2106 }
2107
2108 #[test]
2109 fn test_fill_non_output_indices() {
2110 let data_types = vec![DataType::Int32, DataType::Int32, DataType::Int32];
2111 let replicated_chunk = [OwnedRow::new(vec![
2112 Some(222_i32.into()),
2113 Some(2_i32.into()),
2114 ])];
2115 let replicated_chunk = StreamChunk::from_parts(
2116 vec![Op::Insert],
2117 DataChunk::from_rows(&replicated_chunk, &[DataType::Int32, DataType::Int32]),
2118 );
2119 let i2o_mapping = ColIndexMapping::new(vec![Some(1), None, Some(0)], 2);
2120 let filled_chunk = fill_non_output_indices(&i2o_mapping, &data_types, replicated_chunk);
2121 check(
2122 filled_chunk,
2123 expect![[r#"
2124 StreamChunk { cardinality: 1, capacity: 1, data:
2125 +---+---+---+-----+
2126 | + | 2 | | 222 |
2127 +---+---+---+-----+
2128 }"#]],
2129 );
2130 }
2131}