1use std::collections::{BTreeMap, HashMap};
16use std::marker::PhantomData;
17use std::ops::Bound;
18use std::ops::Bound::*;
19use std::sync::Arc;
20use std::time::Instant;
21
22use bytes::Bytes;
23use either::Either;
24use foyer::Hint;
25use futures::future::try_join_all;
26use futures::{Stream, StreamExt, TryStreamExt, pin_mut};
27use futures_async_stream::for_await;
28use itertools::Itertools;
29use risingwave_common::array::stream_record::Record;
30use risingwave_common::array::{ArrayImplBuilder, ArrayRef, DataChunk, Op, StreamChunk};
31use risingwave_common::bitmap::Bitmap;
32use risingwave_common::catalog::{
33 ColumnDesc, ColumnId, TableId, TableOption, get_dist_key_in_pk_indices,
34};
35use risingwave_common::config::StreamingConfig;
36use risingwave_common::hash::{VirtualNode, VnodeBitmapExt, VnodeCountCompat};
37use risingwave_common::row::{self, Once, OwnedRow, Row, RowExt, once};
38use risingwave_common::types::{DataType, Datum, DefaultOrd, DefaultOrdered, ScalarImpl};
39use risingwave_common::util::column_index_mapping::ColIndexMapping;
40use risingwave_common::util::epoch::EpochPair;
41use risingwave_common::util::row_serde::OrderedRowSerde;
42use risingwave_common::util::sort_util::OrderType;
43use risingwave_common::util::value_encoding::BasicSerde;
44use risingwave_hummock_sdk::HummockReadEpoch;
45use risingwave_hummock_sdk::key::{
46 CopyFromSlice, TableKey, end_bound_of_prefix, next_key, prefix_slice_with_vnode,
47 prefixed_range_with_vnode, start_bound_of_excluded_prefix,
48};
49use risingwave_hummock_sdk::table_watermark::{
50 VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
51};
52use risingwave_pb::catalog::Table;
53use risingwave_storage::StateStore;
54use risingwave_storage::error::{ErrorKind, StorageError, StorageResult};
55use risingwave_storage::hummock::CachePolicy;
56use risingwave_storage::mem_table::MemTableError;
57use risingwave_storage::row_serde::find_columns_by_ids;
58use risingwave_storage::row_serde::row_serde_util::{
59 deserialize_pk_with_vnode, serialize_pk, serialize_pk_with_vnode,
60};
61use risingwave_storage::row_serde::value_serde::ValueRowSerde;
62use risingwave_storage::store::*;
63use risingwave_storage::table::merge_sort::merge_sort;
64use risingwave_storage::table::{KeyedRow, TableDistribution};
65use thiserror_ext::AsReport;
66use tracing::{Instrument, trace};
67
68use crate::cache::cache_may_stale;
69use crate::common::state_cache::{StateCache, StateCacheFiller};
70use crate::common::table::state_table_cache::StateTableWatermarkCache;
71use crate::executor::StreamExecutorResult;
72
73const WATERMARK_CACHE_ENTRIES: usize = 16;
77
78macro_rules! insane_mode_discard_point {
81 () => {{
82 use rand::Rng;
83 if crate::consistency::insane() && rand::rng().random_bool(0.3) {
84 return;
85 }
86 }};
87}
88
89pub struct StateTableInner<
92 S,
93 SD = BasicSerde,
94 const IS_REPLICATED: bool = false,
95 const USE_WATERMARK_CACHE: bool = false,
96> where
97 S: StateStore,
98 SD: ValueRowSerde,
99{
100 table_id: TableId,
102
103 row_store: StateTableRowStore<S::Local, SD>,
105
106 store: S,
108
109 epoch: Option<EpochPair>,
111
112 pk_serde: OrderedRowSerde,
114
115 pk_indices: Vec<usize>,
119
120 distribution: TableDistribution,
126
127 prefix_hint_len: usize,
128
129 value_indices: Option<Vec<usize>>,
130
131 pending_watermark: Option<ScalarImpl>,
133 committed_watermark: Option<ScalarImpl>,
135 watermark_cache: StateTableWatermarkCache,
137
138 data_types: Vec<DataType>,
141
142 i2o_mapping: ColIndexMapping,
148
149 pub output_indices: Vec<usize>,
154
155 op_consistency_level: StateTableOpConsistencyLevel,
156
157 clean_watermark_index_in_pk: Option<i32>,
158
159 on_post_commit: bool,
162}
163
164pub type StateTable<S> = StateTableInner<S, BasicSerde>;
166pub type ReplicatedStateTable<S, SD> = StateTableInner<S, SD, true>;
169pub type WatermarkCacheStateTable<S> = StateTableInner<S, BasicSerde, false, true>;
172pub type WatermarkCacheParameterizedStateTable<S, const USE_WATERMARK_CACHE: bool> =
173 StateTableInner<S, BasicSerde, false, USE_WATERMARK_CACHE>;
174
175impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
177 StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
178where
179 S: StateStore,
180 SD: ValueRowSerde,
181{
182 pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
185 self.row_store
186 .init(epoch, self.distribution.vnodes())
187 .await?;
188 assert_eq!(None, self.epoch.replace(epoch), "should not init for twice");
189 Ok(())
190 }
191
192 pub async fn try_wait_committed_epoch(&self, prev_epoch: u64) -> StorageResult<()> {
193 self.store
194 .try_wait_epoch(
195 HummockReadEpoch::Committed(prev_epoch),
196 TryWaitEpochOptions {
197 table_id: self.table_id,
198 },
199 )
200 .await
201 }
202
203 pub fn state_store(&self) -> &S {
204 &self.store
205 }
206}
207
208fn consistent_old_value_op(
209 row_serde: Arc<impl ValueRowSerde>,
210 is_log_store: bool,
211) -> OpConsistencyLevel {
212 OpConsistencyLevel::ConsistentOldValue {
213 check_old_value: Arc::new(move |first: &Bytes, second: &Bytes| {
214 if first == second {
215 return true;
216 }
217 let first = match row_serde.deserialize(first) {
218 Ok(rows) => rows,
219 Err(e) => {
220 error!(error = %e.as_report(), value = ?first, "fail to deserialize serialized value");
221 return false;
222 }
223 };
224 let second = match row_serde.deserialize(second) {
225 Ok(rows) => rows,
226 Err(e) => {
227 error!(error = %e.as_report(), value = ?second, "fail to deserialize serialized value");
228 return false;
229 }
230 };
231 if first != second {
232 error!(first = ?first, second = ?second, "sanity check fail");
233 false
234 } else {
235 true
236 }
237 }),
238 is_log_store,
239 }
240}
241
242macro_rules! dispatch_value_indices {
243 ($value_indices:expr, [$($row_var_name:ident),+], $body:expr) => {
244 if let Some(value_indices) = $value_indices {
245 $(
246 let $row_var_name = $row_var_name.project(value_indices);
247 )+
248 $body
249 } else {
250 $body
251 }
252 };
253}
254
255struct StateTableRowStore<LS: LocalStateStore, SD: ValueRowSerde> {
260 state_store: LS,
261 all_rows: Option<HashMap<VirtualNode, BTreeMap<Bytes, OwnedRow>>>,
262
263 table_id: TableId,
264 table_option: TableOption,
265 row_serde: Arc<SD>,
266 pk_serde: OrderedRowSerde,
268}
269
270impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
271 async fn may_reload_all_rows(&mut self, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
272 if let Some(rows) = &mut self.all_rows {
273 rows.clear();
274 let start_time = Instant::now();
275 *rows = try_join_all(vnode_bitmap.iter_vnodes().map(|vnode| {
276 let state_store = &self.state_store;
277 let retention_seconds = self.table_option.retention_seconds;
278 let row_serde = &self.row_serde;
279 async move {
280 let mut rows = BTreeMap::new();
281 let memcomparable_range_with_vnode =
282 prefixed_range_with_vnode::<Bytes>(.., vnode);
283 let stream = deserialize_keyed_row_stream::<Bytes>(
285 state_store
286 .iter(
287 memcomparable_range_with_vnode,
288 ReadOptions {
289 prefix_hint: None,
290 prefetch_options: Default::default(),
291 cache_policy: Default::default(),
292 retention_seconds,
293 },
294 )
295 .await?,
296 &**row_serde,
297 );
298 pin_mut!(stream);
299 while let Some((encoded_key, row)) = stream.try_next().await? {
300 let key = TableKey(encoded_key);
301 let (iter_vnode, key) = key.split_vnode_bytes();
302 assert_eq!(vnode, iter_vnode);
303 rows.try_insert(key, row).expect("non-duplicated");
304 }
305 Ok((vnode, rows)) as StreamExecutorResult<_>
306 }
307 }))
308 .await?
309 .into_iter()
310 .collect();
311 if !cfg!(debug_assertions) {
313 info!(table_id = %self.table_id, vnode_count = vnode_bitmap.count_ones(), duration = ?start_time.elapsed(),"finished reloading all rows");
314 }
315 }
316 Ok(())
317 }
318
319 async fn init(&mut self, epoch: EpochPair, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
320 self.state_store.init(InitOptions::new(epoch)).await?;
321 self.may_reload_all_rows(vnode_bitmap).await
322 }
323
324 async fn update_vnode_bitmap(
325 &mut self,
326 vnodes: Arc<Bitmap>,
327 ) -> StreamExecutorResult<Arc<Bitmap>> {
328 let prev_vnodes = self.state_store.update_vnode_bitmap(vnodes.clone()).await?;
329 self.may_reload_all_rows(&vnodes).await?;
330 Ok(prev_vnodes)
331 }
332
333 async fn try_flush(&mut self) -> StreamExecutorResult<()> {
334 self.state_store.try_flush().await?;
335 Ok(())
336 }
337
338 async fn seal_current_epoch(
339 &mut self,
340 next_epoch: u64,
341 table_watermarks: Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)>,
342 switch_consistent_op: Option<StateTableOpConsistencyLevel>,
343 ) -> StreamExecutorResult<()> {
344 if let Some((direction, watermarks, serde_type)) = &table_watermarks
345 && let Some(rows) = &mut self.all_rows
346 {
347 match serde_type {
348 WatermarkSerdeType::PkPrefix => {
349 for vnode_watermark in watermarks {
350 match direction {
351 WatermarkDirection::Ascending => {
352 for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
353 let rows = rows.get_mut(&vnode).expect("covered vnode");
354 *rows = rows.split_off(vnode_watermark.watermark());
356 }
357 }
358 WatermarkDirection::Descending => {
359 let split_off_key = next_key(vnode_watermark.watermark());
361 for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
362 let rows = rows.get_mut(&vnode).expect("covered vnode");
363 rows.split_off(split_off_key.as_slice());
366 }
367 }
368 }
369 }
370 }
371 WatermarkSerdeType::NonPkPrefix => {
372 warn!(table_id = %self.table_id, "table enabled preloading rows got disabled by written non pk prefix watermark");
373 self.all_rows = None;
374 }
375 }
376 }
377 self.state_store
378 .flush()
379 .instrument(tracing::info_span!("state_table_flush"))
380 .await?;
381 let switch_op_consistency_level =
382 switch_consistent_op.map(|new_consistency_level| match new_consistency_level {
383 StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
384 StateTableOpConsistencyLevel::ConsistentOldValue => {
385 consistent_old_value_op(self.row_serde.clone(), false)
386 }
387 StateTableOpConsistencyLevel::LogStoreEnabled => {
388 consistent_old_value_op(self.row_serde.clone(), true)
389 }
390 });
391 self.state_store.seal_current_epoch(
392 next_epoch,
393 SealCurrentEpochOptions {
394 table_watermarks,
395 switch_op_consistency_level,
396 },
397 );
398 Ok(())
399 }
400}
401
402#[derive(Eq, PartialEq, Copy, Clone, Debug)]
403pub enum StateTableOpConsistencyLevel {
404 Inconsistent,
406 ConsistentOldValue,
410 LogStoreEnabled,
413}
414
415pub struct StateTableBuilder<
416 'a,
417 S,
418 SD,
419 const IS_REPLICATED: bool,
420 const USE_WATERMARK_CACHE: bool,
421 PreloadAllRow,
422> {
423 table_catalog: &'a Table,
424 store: S,
425 vnodes: Option<Arc<Bitmap>>,
426 op_consistency_level: Option<StateTableOpConsistencyLevel>,
427 output_column_ids: Option<Vec<ColumnId>>,
428 preload_all_rows: PreloadAllRow,
429
430 _serde: PhantomData<SD>,
431}
432
433impl<
434 'a,
435 S: StateStore,
436 SD: ValueRowSerde,
437 const IS_REPLICATED: bool,
438 const USE_WATERMARK_CACHE: bool,
439> StateTableBuilder<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE, ()>
440{
441 pub fn new(table_catalog: &'a Table, store: S, vnodes: Option<Arc<Bitmap>>) -> Self {
442 Self {
443 table_catalog,
444 store,
445 vnodes,
446 op_consistency_level: None,
447 output_column_ids: None,
448 preload_all_rows: (),
449 _serde: Default::default(),
450 }
451 }
452
453 fn with_preload_all_rows(
454 self,
455 preload_all_rows: bool,
456 ) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE, bool> {
457 StateTableBuilder {
458 table_catalog: self.table_catalog,
459 store: self.store,
460 vnodes: self.vnodes,
461 op_consistency_level: self.op_consistency_level,
462 output_column_ids: self.output_column_ids,
463 preload_all_rows,
464 _serde: Default::default(),
465 }
466 }
467
468 pub fn enable_preload_all_rows_by_config(
469 self,
470 config: &StreamingConfig,
471 ) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE, bool> {
472 let developer = &config.developer;
473 let preload_all_rows = if developer.default_enable_mem_preload_state_table {
474 !developer
475 .mem_preload_state_table_ids_blacklist
476 .contains(&self.table_catalog.id)
477 } else {
478 developer
479 .mem_preload_state_table_ids_whitelist
480 .contains(&self.table_catalog.id)
481 };
482 self.with_preload_all_rows(preload_all_rows)
483 }
484
485 pub fn forbid_preload_all_rows(
486 self,
487 ) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE, bool> {
488 self.with_preload_all_rows(false)
489 }
490}
491
492impl<
493 'a,
494 S: StateStore,
495 SD: ValueRowSerde,
496 const IS_REPLICATED: bool,
497 const USE_WATERMARK_CACHE: bool,
498 PreloadAllRow,
499> StateTableBuilder<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE, PreloadAllRow>
500{
501 pub fn with_op_consistency_level(
502 mut self,
503 op_consistency_level: StateTableOpConsistencyLevel,
504 ) -> Self {
505 self.op_consistency_level = Some(op_consistency_level);
506 self
507 }
508
509 pub fn with_output_column_ids(mut self, output_column_ids: Vec<ColumnId>) -> Self {
510 self.output_column_ids = Some(output_column_ids);
511 self
512 }
513}
514
515impl<
516 'a,
517 S: StateStore,
518 SD: ValueRowSerde,
519 const IS_REPLICATED: bool,
520 const USE_WATERMARK_CACHE: bool,
521> StateTableBuilder<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE, bool>
522{
523 pub async fn build(self) -> StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE> {
524 let mut preload_all_rows = self.preload_all_rows;
525 if preload_all_rows
526 && let Err(e) =
527 risingwave_common::license::Feature::StateTableMemoryPreload.check_available()
528 {
529 warn!(table_id=self.table_catalog.id, e=%e.as_report(), "table configured to preload rows to memory but disabled by license");
530 preload_all_rows = false;
531 }
532 StateTableInner::from_table_catalog_inner(
533 self.table_catalog,
534 self.store,
535 self.vnodes,
536 self.op_consistency_level
537 .unwrap_or(StateTableOpConsistencyLevel::ConsistentOldValue),
538 self.output_column_ids.unwrap_or_default(),
539 preload_all_rows,
540 )
541 .await
542 }
543}
544
545impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
550 StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
551where
552 S: StateStore,
553 SD: ValueRowSerde,
554{
555 #[cfg(any(test, feature = "test"))]
559 pub async fn from_table_catalog(
560 table_catalog: &Table,
561 store: S,
562 vnodes: Option<Arc<Bitmap>>,
563 ) -> Self {
564 StateTableBuilder::new(table_catalog, store, vnodes)
565 .forbid_preload_all_rows()
566 .build()
567 .await
568 }
569
570 pub async fn from_table_catalog_inconsistent_op(
572 table_catalog: &Table,
573 store: S,
574 vnodes: Option<Arc<Bitmap>>,
575 ) -> Self {
576 StateTableBuilder::new(table_catalog, store, vnodes)
577 .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
578 .forbid_preload_all_rows()
579 .build()
580 .await
581 }
582
583 async fn from_table_catalog_inner(
585 table_catalog: &Table,
586 store: S,
587 vnodes: Option<Arc<Bitmap>>,
588 op_consistency_level: StateTableOpConsistencyLevel,
589 output_column_ids: Vec<ColumnId>,
590 preload_all_rows: bool,
591 ) -> Self {
592 let table_id = TableId::new(table_catalog.id);
593 let table_columns: Vec<ColumnDesc> = table_catalog
594 .columns
595 .iter()
596 .map(|col| col.column_desc.as_ref().unwrap().into())
597 .collect();
598 let data_types: Vec<DataType> = table_catalog
599 .columns
600 .iter()
601 .map(|col| {
602 col.get_column_desc()
603 .unwrap()
604 .get_column_type()
605 .unwrap()
606 .into()
607 })
608 .collect();
609 let order_types: Vec<OrderType> = table_catalog
610 .pk
611 .iter()
612 .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
613 .collect();
614 let dist_key_indices: Vec<usize> = table_catalog
615 .distribution_key
616 .iter()
617 .map(|dist_index| *dist_index as usize)
618 .collect();
619
620 let pk_indices = table_catalog
621 .pk
622 .iter()
623 .map(|col_order| col_order.column_index as usize)
624 .collect_vec();
625
626 let dist_key_in_pk_indices = if table_catalog.get_dist_key_in_pk().is_empty() {
628 get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices).unwrap()
629 } else {
630 table_catalog
631 .get_dist_key_in_pk()
632 .iter()
633 .map(|idx| *idx as usize)
634 .collect()
635 };
636
637 let vnode_col_idx_in_pk = table_catalog.vnode_col_index.as_ref().and_then(|idx| {
638 let vnode_col_idx = *idx as usize;
639 pk_indices.iter().position(|&i| vnode_col_idx == i)
640 });
641
642 let distribution =
643 TableDistribution::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk);
644 assert_eq!(
645 distribution.vnode_count(),
646 table_catalog.vnode_count(),
647 "vnode count mismatch, scanning table {} under wrong distribution?",
648 table_catalog.name,
649 );
650
651 let pk_data_types = pk_indices
652 .iter()
653 .map(|i| table_columns[*i].data_type.clone())
654 .collect();
655 let pk_serde = OrderedRowSerde::new(pk_data_types, order_types);
656
657 let input_value_indices = table_catalog
658 .value_indices
659 .iter()
660 .map(|val| *val as usize)
661 .collect_vec();
662
663 let no_shuffle_value_indices = (0..table_columns.len()).collect_vec();
664
665 let value_indices = match input_value_indices.len() == table_columns.len()
667 && input_value_indices == no_shuffle_value_indices
668 {
669 true => None,
670 false => Some(input_value_indices),
671 };
672 let prefix_hint_len = table_catalog.read_prefix_len_hint as usize;
673
674 let row_serde = Arc::new(SD::new(
675 Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
676 Arc::from(table_columns.clone().into_boxed_slice()),
677 ));
678
679 let state_table_op_consistency_level = op_consistency_level;
680 let op_consistency_level = match op_consistency_level {
681 StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
682 StateTableOpConsistencyLevel::ConsistentOldValue => {
683 consistent_old_value_op(row_serde.clone(), false)
684 }
685 StateTableOpConsistencyLevel::LogStoreEnabled => {
686 consistent_old_value_op(row_serde.clone(), true)
687 }
688 };
689
690 let table_option = TableOption::new(table_catalog.retention_seconds);
691 let new_local_options = if IS_REPLICATED {
692 NewLocalOptions::new_replicated(
693 table_id,
694 op_consistency_level,
695 table_option,
696 distribution.vnodes().clone(),
697 )
698 } else {
699 NewLocalOptions::new(
700 table_id,
701 op_consistency_level,
702 table_option,
703 distribution.vnodes().clone(),
704 true,
705 )
706 };
707 let local_state_store = store.new_local(new_local_options).await;
708
709 assert_eq!(
715 table_catalog.version.is_some(),
716 row_serde.kind().is_column_aware()
717 );
718
719 let watermark_serde = if pk_indices.is_empty() {
721 None
722 } else {
723 match table_catalog.clean_watermark_index_in_pk {
724 None => Some(pk_serde.index(0)),
725 Some(clean_watermark_index_in_pk) => {
726 Some(pk_serde.index(clean_watermark_index_in_pk as usize))
727 }
728 }
729 };
730 let max_watermark_of_vnodes = distribution
731 .vnodes()
732 .iter_vnodes()
733 .filter_map(|vnode| local_state_store.get_table_watermark(vnode))
734 .max();
735 let committed_watermark = if let Some(deser) = watermark_serde
736 && let Some(max_watermark) = max_watermark_of_vnodes
737 {
738 let deserialized = deser.deserialize(&max_watermark).ok().and_then(|row| {
739 assert!(row.len() == 1);
740 row[0].clone()
741 });
742 if deserialized.is_none() {
743 tracing::error!(
744 vnodes = ?distribution.vnodes(),
745 watermark = ?max_watermark,
746 "Failed to deserialize persisted watermark from state store.",
747 );
748 }
749 deserialized
750 } else {
751 None
752 };
753
754 let watermark_cache = if USE_WATERMARK_CACHE {
755 StateTableWatermarkCache::new(WATERMARK_CACHE_ENTRIES)
756 } else {
757 StateTableWatermarkCache::new(0)
758 };
759
760 let output_column_ids_to_input_idx = output_column_ids
762 .iter()
763 .enumerate()
764 .map(|(pos, id)| (*id, pos))
765 .collect::<HashMap<_, _>>();
766
767 let columns: Vec<ColumnDesc> = table_catalog
769 .columns
770 .iter()
771 .map(|c| c.column_desc.as_ref().unwrap().into())
772 .collect_vec();
773
774 let mut i2o_mapping = vec![None; columns.len()];
778 for (i, column) in columns.iter().enumerate() {
779 if let Some(pos) = output_column_ids_to_input_idx.get(&column.column_id) {
780 i2o_mapping[i] = Some(*pos);
781 }
782 }
783 let i2o_mapping = ColIndexMapping::new(i2o_mapping, output_column_ids.len());
785
786 let (_, output_indices) = find_columns_by_ids(&columns[..], &output_column_ids);
788
789 Self {
790 table_id,
791 row_store: StateTableRowStore {
792 all_rows: preload_all_rows.then(HashMap::new),
793 table_option,
794 state_store: local_state_store,
795 row_serde,
796 pk_serde: pk_serde.clone(),
797 table_id,
798 },
799 store,
800 epoch: None,
801 pk_serde,
802 pk_indices,
803 distribution,
804 prefix_hint_len,
805 value_indices,
806 pending_watermark: None,
807 committed_watermark,
808 watermark_cache,
809 data_types,
810 output_indices,
811 i2o_mapping,
812 op_consistency_level: state_table_op_consistency_level,
813 clean_watermark_index_in_pk: table_catalog.clean_watermark_index_in_pk,
814 on_post_commit: false,
815 }
816 }
817
818 pub fn get_data_types(&self) -> &[DataType] {
819 &self.data_types
820 }
821
822 pub fn table_id(&self) -> u32 {
823 self.table_id.table_id
824 }
825
826 fn compute_prefix_vnode(&self, pk_prefix: &impl Row) -> VirtualNode {
828 self.distribution
829 .try_compute_vnode_by_pk_prefix(pk_prefix)
830 .expect("For streaming, the given prefix must be enough to calculate the vnode")
831 }
832
833 pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode {
835 self.distribution.compute_vnode_by_pk(pk)
836 }
837
838 pub fn pk_indices(&self) -> &[usize] {
841 &self.pk_indices
842 }
843
844 pub fn pk_in_output_indices(&self) -> Option<Vec<usize>> {
848 assert!(IS_REPLICATED);
849 self.pk_indices
850 .iter()
851 .map(|&i| self.output_indices.iter().position(|&j| i == j))
852 .collect()
853 }
854
855 pub fn pk_serde(&self) -> &OrderedRowSerde {
856 &self.pk_serde
857 }
858
859 pub fn vnodes(&self) -> &Arc<Bitmap> {
860 self.distribution.vnodes()
861 }
862
863 pub fn value_indices(&self) -> &Option<Vec<usize>> {
864 &self.value_indices
865 }
866
867 pub fn is_consistent_op(&self) -> bool {
868 matches!(
869 self.op_consistency_level,
870 StateTableOpConsistencyLevel::ConsistentOldValue
871 | StateTableOpConsistencyLevel::LogStoreEnabled
872 )
873 }
874}
875
876impl<S, SD, const USE_WATERMARK_CACHE: bool> StateTableInner<S, SD, true, USE_WATERMARK_CACHE>
877where
878 S: StateStore,
879 SD: ValueRowSerde,
880{
881 pub async fn new_replicated(
883 table_catalog: &Table,
884 store: S,
885 vnodes: Option<Arc<Bitmap>>,
886 output_column_ids: Vec<ColumnId>,
887 ) -> Self {
888 StateTableBuilder::new(table_catalog, store, vnodes)
891 .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
892 .with_output_column_ids(output_column_ids)
893 .forbid_preload_all_rows()
894 .build()
895 .await
896 }
897}
898
899impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
901 StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
902where
903 S: StateStore,
904 SD: ValueRowSerde,
905{
906 pub async fn get_row(&self, pk: impl Row) -> StreamExecutorResult<Option<OwnedRow>> {
908 let (serialized_pk, prefix_hint) = self.serialize_pk_and_get_prefix_hint(&pk);
909 let row = self.row_store.get(serialized_pk, prefix_hint).await?;
910 match row {
911 Some(row) => {
912 if IS_REPLICATED {
913 let row = row.project(&self.output_indices);
916 Ok(Some(row.into_owned_row()))
917 } else {
918 Ok(Some(row))
919 }
920 }
921 None => Ok(None),
922 }
923 }
924
925 pub async fn exists(&self, pk: impl Row) -> StreamExecutorResult<bool> {
927 let (serialized_pk, prefix_hint) = self.serialize_pk_and_get_prefix_hint(&pk);
928 self.row_store.exists(serialized_pk, prefix_hint).await
929 }
930
931 fn serialize_pk(&self, pk: &impl Row) -> TableKey<Bytes> {
932 assert!(pk.len() <= self.pk_indices.len());
933 serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_vnode_by_pk(pk))
934 }
935
936 fn serialize_pk_and_get_prefix_hint(&self, pk: &impl Row) -> (TableKey<Bytes>, Option<Bytes>) {
937 let serialized_pk = self.serialize_pk(&pk);
938 let prefix_hint = if self.prefix_hint_len != 0 && self.prefix_hint_len == pk.len() {
939 Some(serialized_pk.slice(VirtualNode::SIZE..))
940 } else {
941 #[cfg(debug_assertions)]
942 if self.prefix_hint_len != 0 {
943 warn!(
944 "prefix_hint_len is not equal to pk.len(), may not be able to utilize bloom filter"
945 );
946 }
947 None
948 };
949 (serialized_pk, prefix_hint)
950 }
951}
952
953impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
954 async fn get(
955 &self,
956 key_bytes: TableKey<Bytes>,
957 prefix_hint: Option<Bytes>,
958 ) -> StreamExecutorResult<Option<OwnedRow>> {
959 if let Some(rows) = &self.all_rows {
960 let (vnode, key) = key_bytes.split_vnode();
961 return Ok(rows.get(&vnode).expect("covered vnode").get(key).cloned());
962 }
963 let read_options = ReadOptions {
964 prefix_hint,
965 retention_seconds: self.table_option.retention_seconds,
966 cache_policy: CachePolicy::Fill(Hint::Normal),
967 ..Default::default()
968 };
969
970 let row_serde = self.row_serde.clone();
972
973 self.state_store
974 .on_key_value(key_bytes, read_options, move |_, value| {
975 let row = row_serde.deserialize(value)?;
976 Ok(OwnedRow::new(row))
977 })
978 .await
979 .map_err(Into::into)
980 }
981
982 async fn exists(
983 &self,
984 key_bytes: TableKey<Bytes>,
985 prefix_hint: Option<Bytes>,
986 ) -> StreamExecutorResult<bool> {
987 if let Some(rows) = &self.all_rows {
988 let (vnode, key) = key_bytes.split_vnode();
989 return Ok(rows.get(&vnode).expect("covered vnode").contains_key(key));
990 }
991 let read_options = ReadOptions {
992 prefix_hint,
993 retention_seconds: self.table_option.retention_seconds,
994 cache_policy: CachePolicy::Fill(Hint::Normal),
995 ..Default::default()
996 };
997 let result = self
998 .state_store
999 .on_key_value(key_bytes, read_options, move |_, _| Ok(()))
1000 .await?;
1001 Ok(result.is_some())
1002 }
1003}
1004
1005#[must_use]
1020pub struct StateTablePostCommit<
1021 'a,
1022 S,
1023 SD = BasicSerde,
1024 const IS_REPLICATED: bool = false,
1025 const USE_WATERMARK_CACHE: bool = false,
1026> where
1027 S: StateStore,
1028 SD: ValueRowSerde,
1029{
1030 inner: &'a mut StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>,
1031}
1032
1033impl<'a, S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1034 StateTablePostCommit<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1035where
1036 S: StateStore,
1037 SD: ValueRowSerde,
1038{
1039 pub async fn post_yield_barrier(
1040 mut self,
1041 new_vnodes: Option<Arc<Bitmap>>,
1042 ) -> StreamExecutorResult<
1043 Option<(
1044 (
1045 Arc<Bitmap>,
1046 Arc<Bitmap>,
1047 &'a mut StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>,
1048 ),
1049 bool,
1050 )>,
1051 > {
1052 self.inner.on_post_commit = false;
1053 Ok(if let Some(new_vnodes) = new_vnodes {
1054 let (old_vnodes, cache_may_stale) =
1055 self.update_vnode_bitmap(new_vnodes.clone()).await?;
1056 Some(((new_vnodes, old_vnodes, self.inner), cache_may_stale))
1057 } else {
1058 None
1059 })
1060 }
1061
1062 pub fn inner(&self) -> &StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE> {
1063 &*self.inner
1064 }
1065
1066 async fn update_vnode_bitmap(
1068 &mut self,
1069 new_vnodes: Arc<Bitmap>,
1070 ) -> StreamExecutorResult<(Arc<Bitmap>, bool)> {
1071 let prev_vnodes = self
1072 .inner
1073 .row_store
1074 .update_vnode_bitmap(new_vnodes.clone())
1075 .await?;
1076 assert_eq!(
1077 &prev_vnodes,
1078 self.inner.vnodes(),
1079 "state table and state store vnode bitmap mismatches"
1080 );
1081
1082 if self.inner.distribution.is_singleton() {
1083 assert_eq!(
1084 &new_vnodes,
1085 self.inner.vnodes(),
1086 "should not update vnode bitmap for singleton table"
1087 );
1088 }
1089 assert_eq!(self.inner.vnodes().len(), new_vnodes.len());
1090
1091 let cache_may_stale = cache_may_stale(self.inner.vnodes(), &new_vnodes);
1092
1093 if cache_may_stale {
1094 self.inner.pending_watermark = None;
1095 if USE_WATERMARK_CACHE {
1096 self.inner.watermark_cache.clear();
1097 }
1098 }
1099
1100 Ok((
1101 self.inner.distribution.update_vnode_bitmap(new_vnodes),
1102 cache_may_stale,
1103 ))
1104 }
1105}
1106
1107impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1109 fn handle_mem_table_error(&self, e: StorageError) {
1110 let e = match e.into_inner() {
1111 ErrorKind::MemTable(e) => e,
1112 _ => unreachable!("should only get memtable error"),
1113 };
1114 match *e {
1115 MemTableError::InconsistentOperation { key, prev, new, .. } => {
1116 let (vnode, key) = deserialize_pk_with_vnode(&key, &self.pk_serde).unwrap();
1117 panic!(
1118 "mem-table operation inconsistent! table_id: {}, vnode: {}, key: {:?}, prev: {}, new: {}",
1119 self.table_id,
1120 vnode,
1121 &key,
1122 prev.debug_fmt(&*self.row_serde),
1123 new.debug_fmt(&*self.row_serde),
1124 )
1125 }
1126 }
1127 }
1128
1129 fn insert(&mut self, key: TableKey<Bytes>, value: impl Row) {
1130 insane_mode_discard_point!();
1131 let value_bytes = self.row_serde.serialize(&value).into();
1132 if let Some(rows) = &mut self.all_rows {
1133 let (vnode, key) = key.split_vnode_bytes();
1134 rows.get_mut(&vnode)
1135 .expect("covered vnode")
1136 .insert(key, value.into_owned_row());
1137 }
1138 self.state_store
1139 .insert(key, value_bytes, None)
1140 .unwrap_or_else(|e| self.handle_mem_table_error(e));
1141 }
1142
1143 fn delete(&mut self, key: TableKey<Bytes>, value: impl Row) {
1144 insane_mode_discard_point!();
1145 let value_bytes = self.row_serde.serialize(value).into();
1146 if let Some(rows) = &mut self.all_rows {
1147 let (vnode, key) = key.split_vnode();
1148 rows.get_mut(&vnode).expect("covered vnode").remove(key);
1149 }
1150 self.state_store
1151 .delete(key, value_bytes)
1152 .unwrap_or_else(|e| self.handle_mem_table_error(e));
1153 }
1154
1155 fn update(&mut self, key_bytes: TableKey<Bytes>, old_value: impl Row, new_value: impl Row) {
1156 insane_mode_discard_point!();
1157 let new_value_bytes = self.row_serde.serialize(&new_value).into();
1158 let old_value_bytes = self.row_serde.serialize(old_value).into();
1159 if let Some(rows) = &mut self.all_rows {
1160 let (vnode, key) = key_bytes.split_vnode_bytes();
1161 rows.get_mut(&vnode)
1162 .expect("covered vnode")
1163 .insert(key, new_value.into_owned_row());
1164 }
1165 self.state_store
1166 .insert(key_bytes, new_value_bytes, Some(old_value_bytes))
1167 .unwrap_or_else(|e| self.handle_mem_table_error(e));
1168 }
1169}
1170
1171impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1172 StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1173where
1174 S: StateStore,
1175 SD: ValueRowSerde,
1176{
1177 pub fn insert(&mut self, value: impl Row) {
1180 let pk_indices = &self.pk_indices;
1181 let pk = (&value).project(pk_indices);
1182 if USE_WATERMARK_CACHE {
1183 self.watermark_cache.insert(&pk);
1184 }
1185
1186 let key_bytes = self.serialize_pk(&pk);
1187 dispatch_value_indices!(&self.value_indices, [value], {
1188 self.row_store.insert(key_bytes, value)
1189 })
1190 }
1191
1192 pub fn delete(&mut self, old_value: impl Row) {
1195 let pk_indices = &self.pk_indices;
1196 let pk = (&old_value).project(pk_indices);
1197 if USE_WATERMARK_CACHE {
1198 self.watermark_cache.delete(&pk);
1199 }
1200
1201 let key_bytes = self.serialize_pk(&pk);
1202 dispatch_value_indices!(&self.value_indices, [old_value], {
1203 self.row_store.delete(key_bytes, old_value)
1204 })
1205 }
1206
1207 pub fn update(&mut self, old_value: impl Row, new_value: impl Row) {
1209 let old_pk = (&old_value).project(self.pk_indices());
1210 let new_pk = (&new_value).project(self.pk_indices());
1211 debug_assert!(
1212 Row::eq(&old_pk, new_pk),
1213 "pk should not change: {old_pk:?} vs {new_pk:?}. {}",
1214 self.table_id
1215 );
1216
1217 let key_bytes = self.serialize_pk(&new_pk);
1218 dispatch_value_indices!(&self.value_indices, [old_value, new_value], {
1219 self.row_store.update(key_bytes, old_value, new_value)
1220 })
1221 }
1222
1223 pub fn write_record(&mut self, record: Record<impl Row>) {
1225 match record {
1226 Record::Insert { new_row } => self.insert(new_row),
1227 Record::Delete { old_row } => self.delete(old_row),
1228 Record::Update { old_row, new_row } => self.update(old_row, new_row),
1229 }
1230 }
1231
1232 fn fill_non_output_indices(&self, chunk: StreamChunk) -> StreamChunk {
1233 fill_non_output_indices(&self.i2o_mapping, &self.data_types, chunk)
1234 }
1235
1236 #[allow(clippy::disallowed_methods)]
1239 pub fn write_chunk(&mut self, chunk: StreamChunk) {
1240 let chunk = if IS_REPLICATED {
1241 self.fill_non_output_indices(chunk)
1242 } else {
1243 chunk
1244 };
1245
1246 let vnodes = self
1247 .distribution
1248 .compute_chunk_vnode(&chunk, &self.pk_indices);
1249
1250 for (idx, optional_row) in chunk.rows_with_holes().enumerate() {
1251 let Some((op, row)) = optional_row else {
1252 continue;
1253 };
1254 let pk = row.project(&self.pk_indices);
1255 let vnode = vnodes[idx];
1256 let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, vnode);
1257 match op {
1258 Op::Insert | Op::UpdateInsert => {
1259 if USE_WATERMARK_CACHE {
1260 self.watermark_cache.insert(&pk);
1261 }
1262 dispatch_value_indices!(&self.value_indices, [row], {
1263 self.row_store.insert(key_bytes, row);
1264 });
1265 }
1266 Op::Delete | Op::UpdateDelete => {
1267 if USE_WATERMARK_CACHE {
1268 self.watermark_cache.delete(&pk);
1269 }
1270 dispatch_value_indices!(&self.value_indices, [row], {
1271 self.row_store.delete(key_bytes, row);
1272 });
1273 }
1274 }
1275 }
1276 }
1277
1278 pub fn update_watermark(&mut self, watermark: ScalarImpl) {
1284 trace!(table_id = %self.table_id, watermark = ?watermark, "update watermark");
1285 self.pending_watermark = Some(watermark);
1286 }
1287
1288 pub fn get_committed_watermark(&self) -> Option<&ScalarImpl> {
1291 self.committed_watermark.as_ref()
1292 }
1293
1294 pub async fn commit(
1295 &mut self,
1296 new_epoch: EpochPair,
1297 ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1298 {
1299 self.commit_inner(new_epoch, None).await
1300 }
1301
1302 #[cfg(test)]
1303 pub async fn commit_for_test(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
1304 self.commit_assert_no_update_vnode_bitmap(new_epoch).await
1305 }
1306
1307 pub async fn commit_assert_no_update_vnode_bitmap(
1308 &mut self,
1309 new_epoch: EpochPair,
1310 ) -> StreamExecutorResult<()> {
1311 let post_commit = self.commit_inner(new_epoch, None).await?;
1312 post_commit.post_yield_barrier(None).await?;
1313 Ok(())
1314 }
1315
1316 pub async fn commit_may_switch_consistent_op(
1317 &mut self,
1318 new_epoch: EpochPair,
1319 op_consistency_level: StateTableOpConsistencyLevel,
1320 ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1321 {
1322 if self.op_consistency_level != op_consistency_level {
1323 if !cfg!(debug_assertions) {
1325 info!(
1326 ?new_epoch,
1327 prev_op_consistency_level = ?self.op_consistency_level,
1328 ?op_consistency_level,
1329 table_id = self.table_id.table_id,
1330 "switch to new op consistency level"
1331 );
1332 }
1333 self.commit_inner(new_epoch, Some(op_consistency_level))
1334 .await
1335 } else {
1336 self.commit_inner(new_epoch, None).await
1337 }
1338 }
1339
1340 async fn commit_inner(
1341 &mut self,
1342 new_epoch: EpochPair,
1343 switch_consistent_op: Option<StateTableOpConsistencyLevel>,
1344 ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1345 {
1346 assert!(!self.on_post_commit);
1347 assert_eq!(
1348 self.epoch.expect("should only be called after init").curr,
1349 new_epoch.prev
1350 );
1351 if let Some(new_consistency_level) = switch_consistent_op {
1352 assert_ne!(self.op_consistency_level, new_consistency_level);
1353 self.op_consistency_level = new_consistency_level;
1354 }
1355 trace!(
1356 table_id = %self.table_id,
1357 epoch = ?self.epoch,
1358 "commit state table"
1359 );
1360
1361 let table_watermarks = self.commit_pending_watermark();
1362 self.row_store
1363 .seal_current_epoch(new_epoch.curr, table_watermarks, switch_consistent_op)
1364 .await?;
1365 self.epoch = Some(new_epoch);
1366
1367 if USE_WATERMARK_CACHE
1369 && !self.watermark_cache.is_synced()
1370 && let Some(ref watermark) = self.committed_watermark
1371 {
1372 let range: (Bound<Once<Datum>>, Bound<Once<Datum>>) =
1373 (Included(once(Some(watermark.clone()))), Unbounded);
1374 let mut pks = Vec::with_capacity(self.watermark_cache.capacity());
1383 {
1384 let mut streams = vec![];
1385 for vnode in self.vnodes().iter_vnodes() {
1386 let stream = self
1387 .iter_keyed_row_with_vnode(vnode, &range, PrefetchOptions::default())
1388 .await?;
1389 streams.push(Box::pin(stream));
1390 }
1391 let merged_stream = merge_sort(streams);
1392 pin_mut!(merged_stream);
1393
1394 #[for_await]
1395 for entry in merged_stream.take(self.watermark_cache.capacity()) {
1396 let keyed_row = entry?;
1397 let pk = self.pk_serde.deserialize(keyed_row.key())?;
1398 if !pk.is_null_at(self.clean_watermark_index_in_pk.unwrap_or(0) as usize) {
1400 pks.push(pk);
1401 }
1402 }
1403 }
1404
1405 let mut filler = self.watermark_cache.begin_syncing();
1406 for pk in pks {
1407 filler.insert_unchecked(DefaultOrdered(pk), ());
1408 }
1409 filler.finish();
1410
1411 let n_cache_entries = self.watermark_cache.len();
1412 if n_cache_entries < self.watermark_cache.capacity() {
1413 self.watermark_cache.set_table_row_count(n_cache_entries);
1414 }
1415 }
1416
1417 self.on_post_commit = true;
1418 Ok(StateTablePostCommit { inner: self })
1419 }
1420
1421 fn commit_pending_watermark(
1423 &mut self,
1424 ) -> Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)> {
1425 let watermark = self.pending_watermark.take()?;
1426 trace!(table_id = %self.table_id, watermark = ?watermark, "state cleaning");
1427
1428 assert!(
1429 !self.pk_indices().is_empty(),
1430 "see pending watermark on empty pk"
1431 );
1432 let watermark_serializer = {
1433 match self.clean_watermark_index_in_pk {
1434 None => self.pk_serde.index(0),
1435 Some(clean_watermark_index_in_pk) => {
1436 self.pk_serde.index(clean_watermark_index_in_pk as usize)
1437 }
1438 }
1439 };
1440
1441 let watermark_type = match self.clean_watermark_index_in_pk {
1442 None => WatermarkSerdeType::PkPrefix,
1443 Some(clean_watermark_index_in_pk) => match clean_watermark_index_in_pk {
1444 0 => WatermarkSerdeType::PkPrefix,
1445 _ => WatermarkSerdeType::NonPkPrefix,
1446 },
1447 };
1448
1449 let should_clean_watermark = {
1450 {
1451 if USE_WATERMARK_CACHE && self.watermark_cache.is_synced() {
1452 if let Some(key) = self.watermark_cache.lowest_key() {
1453 watermark.as_scalar_ref_impl().default_cmp(&key).is_ge()
1454 } else {
1455 false
1460 }
1461 } else {
1462 true
1466 }
1467 }
1468 };
1469
1470 let watermark_suffix =
1471 serialize_pk(row::once(Some(watermark.clone())), &watermark_serializer);
1472
1473 let seal_watermark = if should_clean_watermark {
1475 trace!(table_id = %self.table_id, watermark = ?watermark_suffix, vnodes = ?{
1476 self.vnodes().iter_vnodes().collect_vec()
1477 }, "delete range");
1478
1479 let order_type = watermark_serializer.get_order_types().get(0).unwrap();
1480
1481 if order_type.is_ascending() {
1482 Some((
1483 WatermarkDirection::Ascending,
1484 VnodeWatermark::new(
1485 self.vnodes().clone(),
1486 Bytes::copy_from_slice(watermark_suffix.as_ref()),
1487 ),
1488 watermark_type,
1489 ))
1490 } else {
1491 Some((
1492 WatermarkDirection::Descending,
1493 VnodeWatermark::new(
1494 self.vnodes().clone(),
1495 Bytes::copy_from_slice(watermark_suffix.as_ref()),
1496 ),
1497 watermark_type,
1498 ))
1499 }
1500 } else {
1501 None
1502 };
1503 self.committed_watermark = Some(watermark);
1504
1505 if USE_WATERMARK_CACHE && seal_watermark.is_some() {
1513 self.watermark_cache.clear();
1514 }
1515
1516 seal_watermark.map(|(direction, watermark, is_non_pk_prefix)| {
1517 (direction, vec![watermark], is_non_pk_prefix)
1518 })
1519 }
1520
1521 pub async fn try_flush(&mut self) -> StreamExecutorResult<()> {
1522 self.row_store.try_flush().await?;
1523 Ok(())
1524 }
1525}
1526
1527pub trait RowStream<'a> = Stream<Item = StreamExecutorResult<OwnedRow>> + 'a;
1528pub trait KeyedRowStream<'a> = Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a;
1529pub trait PkRowStream<'a, K> = Stream<Item = StreamExecutorResult<(K, OwnedRow)>> + 'a;
1530
1531pub trait FromVnodeBytes {
1532 fn from_vnode_bytes(vnode: VirtualNode, bytes: &Bytes) -> Self;
1533}
1534
1535impl FromVnodeBytes for Bytes {
1536 fn from_vnode_bytes(vnode: VirtualNode, bytes: &Bytes) -> Self {
1537 prefix_slice_with_vnode(vnode, bytes)
1538 }
1539}
1540
1541impl FromVnodeBytes for () {
1542 fn from_vnode_bytes(_vnode: VirtualNode, _bytes: &Bytes) -> Self {}
1543}
1544
1545impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1547 StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1548where
1549 S: StateStore,
1550 SD: ValueRowSerde,
1551{
1552 pub async fn iter_with_vnode(
1555 &self,
1556
1557 vnode: VirtualNode,
1561 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1562 prefetch_options: PrefetchOptions,
1563 ) -> StreamExecutorResult<impl RowStream<'_>> {
1564 Ok(self
1565 .iter_kv_with_pk_range::<()>(pk_range, vnode, prefetch_options)
1566 .await?
1567 .map_ok(|(_, row)| row))
1568 }
1569
1570 pub async fn iter_keyed_row_with_vnode(
1571 &self,
1572 vnode: VirtualNode,
1573 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1574 prefetch_options: PrefetchOptions,
1575 ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1576 Ok(self
1577 .iter_kv_with_pk_range(pk_range, vnode, prefetch_options)
1578 .await?
1579 .map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)))
1580 }
1581
1582 pub async fn iter_with_vnode_and_output_indices(
1583 &self,
1584 vnode: VirtualNode,
1585 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1586 prefetch_options: PrefetchOptions,
1587 ) -> StreamExecutorResult<impl RowStream<'_>> {
1588 assert!(IS_REPLICATED);
1589 let stream = self
1590 .iter_with_vnode(vnode, pk_range, prefetch_options)
1591 .await?;
1592 Ok(stream.map(|row| row.map(|row| row.project(&self.output_indices).into_owned_row())))
1593 }
1594}
1595
1596impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1597 async fn iter_kv<K: CopyFromSlice + FromVnodeBytes>(
1602 &self,
1603 vnode: VirtualNode,
1604 (start, end): (Bound<Bytes>, Bound<Bytes>),
1605 prefix_hint: Option<Bytes>,
1606 prefetch_options: PrefetchOptions,
1607 ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1608 if let Some(rows) = &self.all_rows {
1609 return Ok(futures::future::Either::Left(futures::stream::iter(
1610 rows.get(&vnode)
1611 .expect("covered vnode")
1612 .range((start, end))
1613 .map(move |(key, value)| Ok((K::from_vnode_bytes(vnode, key), value.clone()))),
1614 )));
1615 }
1616 let read_options = ReadOptions {
1617 prefix_hint,
1618 retention_seconds: self.table_option.retention_seconds,
1619 prefetch_options,
1620 cache_policy: CachePolicy::Fill(Hint::Normal),
1621 };
1622
1623 Ok(futures::future::Either::Right(
1624 deserialize_keyed_row_stream(
1625 self.state_store
1626 .iter(prefixed_range_with_vnode((start, end), vnode), read_options)
1627 .await?,
1628 &*self.row_serde,
1629 ),
1630 ))
1631 }
1632
1633 async fn rev_iter_kv<K: CopyFromSlice + FromVnodeBytes>(
1634 &self,
1635 vnode: VirtualNode,
1636 (start, end): (Bound<Bytes>, Bound<Bytes>),
1637 prefix_hint: Option<Bytes>,
1638 prefetch_options: PrefetchOptions,
1639 ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1640 if let Some(rows) = &self.all_rows {
1641 return Ok(futures::future::Either::Left(futures::stream::iter(
1642 rows.get(&vnode)
1643 .expect("covered vnode")
1644 .range((start, end))
1645 .rev()
1646 .map(move |(key, value)| Ok((K::from_vnode_bytes(vnode, key), value.clone()))),
1647 )));
1648 }
1649 let read_options = ReadOptions {
1650 prefix_hint,
1651 retention_seconds: self.table_option.retention_seconds,
1652 prefetch_options,
1653 cache_policy: CachePolicy::Fill(Hint::Normal),
1654 };
1655
1656 Ok(futures::future::Either::Right(
1657 deserialize_keyed_row_stream(
1658 self.state_store
1659 .rev_iter(prefixed_range_with_vnode((start, end), vnode), read_options)
1660 .await?,
1661 &*self.row_serde,
1662 ),
1663 ))
1664 }
1665}
1666
1667impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1668 StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1669where
1670 S: StateStore,
1671 SD: ValueRowSerde,
1672{
1673 pub async fn iter_with_prefix(
1677 &self,
1678 pk_prefix: impl Row,
1679 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1680 prefetch_options: PrefetchOptions,
1681 ) -> StreamExecutorResult<impl RowStream<'_>> {
1682 let stream = self.iter_with_prefix_inner::<false, ()>(pk_prefix, sub_range, prefetch_options)
1683 .await?;
1684 Ok(stream.map_ok(|(_, row)| row))
1685 }
1686
1687 pub async fn get_from_one_row_table(&self) -> StreamExecutorResult<Option<OwnedRow>> {
1689 let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
1690 let stream = self
1691 .iter_with_prefix(row::empty(), sub_range, Default::default())
1692 .await?;
1693 pin_mut!(stream);
1694
1695 if let Some(res) = stream.next().await {
1696 let value = res?.into_owned_row();
1697 assert!(stream.next().await.is_none());
1698 Ok(Some(value))
1699 } else {
1700 Ok(None)
1701 }
1702 }
1703
1704 pub async fn get_from_one_value_table(&self) -> StreamExecutorResult<Option<ScalarImpl>> {
1709 Ok(self
1710 .get_from_one_row_table()
1711 .await?
1712 .and_then(|row| row[0].clone()))
1713 }
1714
1715 pub async fn iter_keyed_row_with_prefix(
1716 &self,
1717 pk_prefix: impl Row,
1718 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1719 prefetch_options: PrefetchOptions,
1720 ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1721 Ok(
1722 self.iter_with_prefix_inner::<false, Bytes>(pk_prefix, sub_range, prefetch_options)
1723 .await?.map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)),
1724 )
1725 }
1726
1727 pub async fn rev_iter_with_prefix(
1729 &self,
1730 pk_prefix: impl Row,
1731 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1732 prefetch_options: PrefetchOptions,
1733 ) -> StreamExecutorResult<impl RowStream<'_>> {
1734 Ok(
1735 self.iter_with_prefix_inner::<true, ()>(pk_prefix, sub_range, prefetch_options)
1736 .await?.map_ok(|(_, row)| row),
1737 )
1738 }
1739
1740 async fn iter_with_prefix_inner<const REVERSE: bool, K: CopyFromSlice + FromVnodeBytes>(
1741 &self,
1742 pk_prefix: impl Row,
1743 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1744 prefetch_options: PrefetchOptions,
1745 ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1746 let prefix_serializer = self.pk_serde.prefix(pk_prefix.len());
1747 let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
1748
1749 let vnode = self.compute_prefix_vnode(&pk_prefix);
1753
1754 let pk_prefix_indices = &self.pk_indices[..pk_prefix.len()];
1756 if self.prefix_hint_len != 0 {
1757 debug_assert_eq!(self.prefix_hint_len, pk_prefix.len());
1758 }
1759 let prefix_hint = {
1760 if self.prefix_hint_len == 0 || self.prefix_hint_len > pk_prefix.len() {
1761 None
1762 } else {
1763 let encoded_prefix_len = self
1764 .pk_serde
1765 .deserialize_prefix_len(&encoded_prefix, self.prefix_hint_len)?;
1766
1767 Some(Bytes::copy_from_slice(
1768 &encoded_prefix[..encoded_prefix_len],
1769 ))
1770 }
1771 };
1772
1773 trace!(
1774 table_id = %self.table_id(),
1775 ?prefix_hint, ?pk_prefix,
1776 ?pk_prefix_indices,
1777 iter_direction = if REVERSE { "reverse" } else { "forward" },
1778 "storage_iter_with_prefix"
1779 );
1780
1781 let memcomparable_range =
1782 prefix_and_sub_range_to_memcomparable(&self.pk_serde, sub_range, pk_prefix);
1783
1784 Ok(if REVERSE {
1785 futures::future::Either::Left(
1786 self.row_store
1787 .rev_iter_kv(vnode, memcomparable_range, prefix_hint, prefetch_options)
1788 .await?,
1789 )
1790 } else {
1791 futures::future::Either::Right(
1792 self.row_store
1793 .iter_kv(vnode, memcomparable_range, prefix_hint, prefetch_options)
1794 .await?,
1795 )
1796 })
1797 }
1798
1799 async fn iter_kv_with_pk_range<'a, K: CopyFromSlice + FromVnodeBytes>(
1802 &'a self,
1803 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1804 vnode: VirtualNode,
1808 prefetch_options: PrefetchOptions,
1809 ) -> StreamExecutorResult<impl PkRowStream<'a, K>> {
1810 let memcomparable_range = prefix_range_to_memcomparable(&self.pk_serde, pk_range);
1811
1812 self.row_store
1814 .iter_kv(vnode, memcomparable_range, None, prefetch_options)
1815 .await
1816 }
1817
1818 #[cfg(test)]
1819 pub fn get_watermark_cache(&self) -> &StateTableWatermarkCache {
1820 &self.watermark_cache
1821 }
1822}
1823
1824fn deserialize_keyed_row_stream<'a, K: CopyFromSlice>(
1825 iter: impl StateStoreIter + 'a,
1826 deserializer: &'a impl ValueRowSerde,
1827) -> impl PkRowStream<'a, K> {
1828 iter.into_stream(move |(key, value)| {
1829 Ok((
1830 K::copy_from_slice(key.user_key.table_key.as_ref()),
1831 deserializer.deserialize(value).map(OwnedRow::new)?,
1832 ))
1833 })
1834 .map_err(Into::into)
1835}
1836
1837pub fn prefix_range_to_memcomparable(
1838 pk_serde: &OrderedRowSerde,
1839 range: &(Bound<impl Row>, Bound<impl Row>),
1840) -> (Bound<Bytes>, Bound<Bytes>) {
1841 (
1842 start_range_to_memcomparable(pk_serde, &range.0),
1843 end_range_to_memcomparable(pk_serde, &range.1, None),
1844 )
1845}
1846
1847fn prefix_and_sub_range_to_memcomparable(
1848 pk_serde: &OrderedRowSerde,
1849 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1850 pk_prefix: impl Row,
1851) -> (Bound<Bytes>, Bound<Bytes>) {
1852 let (range_start, range_end) = sub_range;
1853 let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1854 let serialized_pk_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
1855 let start_range = match range_start {
1856 Included(start_range) => Bound::Included(Either::Left((&pk_prefix).chain(start_range))),
1857 Excluded(start_range) => Bound::Excluded(Either::Left((&pk_prefix).chain(start_range))),
1858 Unbounded => Bound::Included(Either::Right(&pk_prefix)),
1859 };
1860 let end_range = match range_end {
1861 Included(end_range) => Bound::Included((&pk_prefix).chain(end_range)),
1862 Excluded(end_range) => Bound::Excluded((&pk_prefix).chain(end_range)),
1863 Unbounded => Unbounded,
1864 };
1865 (
1866 start_range_to_memcomparable(pk_serde, &start_range),
1867 end_range_to_memcomparable(pk_serde, &end_range, Some(serialized_pk_prefix)),
1868 )
1869}
1870
1871fn start_range_to_memcomparable<R: Row>(
1872 pk_serde: &OrderedRowSerde,
1873 bound: &Bound<R>,
1874) -> Bound<Bytes> {
1875 let serialize_pk_prefix = |pk_prefix: &R| {
1876 let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1877 serialize_pk(pk_prefix, &prefix_serializer)
1878 };
1879 match bound {
1880 Unbounded => Unbounded,
1881 Included(r) => {
1882 let serialized = serialize_pk_prefix(r);
1883
1884 Included(serialized)
1885 }
1886 Excluded(r) => {
1887 let serialized = serialize_pk_prefix(r);
1888
1889 start_bound_of_excluded_prefix(&serialized)
1890 }
1891 }
1892}
1893
1894fn end_range_to_memcomparable<R: Row>(
1895 pk_serde: &OrderedRowSerde,
1896 bound: &Bound<R>,
1897 serialized_pk_prefix: Option<Bytes>,
1898) -> Bound<Bytes> {
1899 let serialize_pk_prefix = |pk_prefix: &R| {
1900 let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1901 serialize_pk(pk_prefix, &prefix_serializer)
1902 };
1903 match bound {
1904 Unbounded => match serialized_pk_prefix {
1905 Some(serialized_pk_prefix) => end_bound_of_prefix(&serialized_pk_prefix),
1906 None => Unbounded,
1907 },
1908 Included(r) => {
1909 let serialized = serialize_pk_prefix(r);
1910
1911 end_bound_of_prefix(&serialized)
1912 }
1913 Excluded(r) => {
1914 let serialized = serialize_pk_prefix(r);
1915 Excluded(serialized)
1916 }
1917 }
1918}
1919
1920fn fill_non_output_indices(
1921 i2o_mapping: &ColIndexMapping,
1922 data_types: &[DataType],
1923 chunk: StreamChunk,
1924) -> StreamChunk {
1925 let cardinality = chunk.cardinality();
1926 let (ops, columns, vis) = chunk.into_inner();
1927 let mut full_columns = Vec::with_capacity(data_types.len());
1928 for (i, data_type) in data_types.iter().enumerate() {
1929 if let Some(j) = i2o_mapping.try_map(i) {
1930 full_columns.push(columns[j].clone());
1931 } else {
1932 let mut column_builder = ArrayImplBuilder::with_type(cardinality, data_type.clone());
1933 column_builder.append_n_null(cardinality);
1934 let column: ArrayRef = column_builder.finish().into();
1935 full_columns.push(column)
1936 }
1937 }
1938 let data_chunk = DataChunk::new(full_columns, vis);
1939 StreamChunk::from_parts(ops, data_chunk)
1940}
1941
1942#[cfg(test)]
1943mod tests {
1944 use std::fmt::Debug;
1945
1946 use expect_test::{Expect, expect};
1947
1948 use super::*;
1949
1950 fn check(actual: impl Debug, expect: Expect) {
1951 let actual = format!("{:#?}", actual);
1952 expect.assert_eq(&actual);
1953 }
1954
1955 #[test]
1956 fn test_fill_non_output_indices() {
1957 let data_types = vec![DataType::Int32, DataType::Int32, DataType::Int32];
1958 let replicated_chunk = [OwnedRow::new(vec![
1959 Some(222_i32.into()),
1960 Some(2_i32.into()),
1961 ])];
1962 let replicated_chunk = StreamChunk::from_parts(
1963 vec![Op::Insert],
1964 DataChunk::from_rows(&replicated_chunk, &[DataType::Int32, DataType::Int32]),
1965 );
1966 let i2o_mapping = ColIndexMapping::new(vec![Some(1), None, Some(0)], 2);
1967 let filled_chunk = fill_non_output_indices(&i2o_mapping, &data_types, replicated_chunk);
1968 check(
1969 filled_chunk,
1970 expect![[r#"
1971 StreamChunk { cardinality: 1, capacity: 1, data:
1972 +---+---+---+-----+
1973 | + | 2 | | 222 |
1974 +---+---+---+-----+
1975 }"#]],
1976 );
1977 }
1978}