1use std::collections::{BTreeMap, HashMap};
16use std::marker::PhantomData;
17use std::ops::Bound;
18use std::ops::Bound::*;
19use std::sync::Arc;
20use std::time::Instant;
21
22use bytes::Bytes;
23use either::Either;
24use foyer::Hint;
25use futures::future::try_join_all;
26use futures::{Stream, StreamExt, TryStreamExt, pin_mut};
27use futures_async_stream::for_await;
28use itertools::Itertools;
29use risingwave_common::array::stream_record::Record;
30use risingwave_common::array::{ArrayImplBuilder, ArrayRef, DataChunk, Op, StreamChunk};
31use risingwave_common::bitmap::Bitmap;
32use risingwave_common::catalog::{
33 ColumnDesc, ColumnId, TableId, TableOption, get_dist_key_in_pk_indices,
34};
35use risingwave_common::config::StreamingConfig;
36use risingwave_common::hash::{VirtualNode, VnodeBitmapExt, VnodeCountCompat};
37use risingwave_common::row::{self, Once, OwnedRow, Row, RowExt, once};
38use risingwave_common::types::{DataType, Datum, DefaultOrd, DefaultOrdered, ScalarImpl};
39use risingwave_common::util::column_index_mapping::ColIndexMapping;
40use risingwave_common::util::epoch::EpochPair;
41use risingwave_common::util::row_serde::OrderedRowSerde;
42use risingwave_common::util::sort_util::OrderType;
43use risingwave_common::util::value_encoding::BasicSerde;
44use risingwave_hummock_sdk::HummockReadEpoch;
45use risingwave_hummock_sdk::key::{
46 CopyFromSlice, TableKey, end_bound_of_prefix, next_key, prefix_slice_with_vnode,
47 prefixed_range_with_vnode, start_bound_of_excluded_prefix,
48};
49use risingwave_hummock_sdk::table_watermark::{
50 VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
51};
52use risingwave_pb::catalog::Table;
53use risingwave_storage::StateStore;
54use risingwave_storage::error::{ErrorKind, StorageError, StorageResult};
55use risingwave_storage::hummock::CachePolicy;
56use risingwave_storage::mem_table::MemTableError;
57use risingwave_storage::row_serde::find_columns_by_ids;
58use risingwave_storage::row_serde::row_serde_util::{
59 deserialize_pk_with_vnode, serialize_pk, serialize_pk_with_vnode,
60};
61use risingwave_storage::row_serde::value_serde::ValueRowSerde;
62use risingwave_storage::store::*;
63use risingwave_storage::table::merge_sort::merge_sort;
64use risingwave_storage::table::{KeyedRow, TableDistribution};
65use thiserror_ext::AsReport;
66use tracing::{Instrument, trace};
67
68use crate::cache::cache_may_stale;
69use crate::common::state_cache::{StateCache, StateCacheFiller};
70use crate::common::table::state_table_cache::StateTableWatermarkCache;
71use crate::executor::StreamExecutorResult;
72
73const WATERMARK_CACHE_ENTRIES: usize = 16;
77
78macro_rules! insane_mode_discard_point {
81 () => {{
82 use rand::Rng;
83 if crate::consistency::insane() && rand::rng().random_bool(0.3) {
84 return;
85 }
86 }};
87}
88
89pub struct StateTableInner<
92 S,
93 SD = BasicSerde,
94 const IS_REPLICATED: bool = false,
95 const USE_WATERMARK_CACHE: bool = false,
96> where
97 S: StateStore,
98 SD: ValueRowSerde,
99{
100 table_id: TableId,
102
103 row_store: StateTableRowStore<S::Local, SD>,
105
106 store: S,
108
109 epoch: Option<EpochPair>,
111
112 pk_serde: OrderedRowSerde,
114
115 pk_indices: Vec<usize>,
119
120 distribution: TableDistribution,
126
127 prefix_hint_len: usize,
128
129 value_indices: Option<Vec<usize>>,
130
131 pending_watermark: Option<ScalarImpl>,
133 committed_watermark: Option<ScalarImpl>,
135 watermark_cache: StateTableWatermarkCache,
137
138 data_types: Vec<DataType>,
141
142 i2o_mapping: ColIndexMapping,
148
149 pub output_indices: Vec<usize>,
154
155 op_consistency_level: StateTableOpConsistencyLevel,
156
157 clean_watermark_index_in_pk: Option<i32>,
158
159 on_post_commit: bool,
162}
163
164pub type StateTable<S> = StateTableInner<S, BasicSerde>;
166pub type ReplicatedStateTable<S, SD> = StateTableInner<S, SD, true>;
169pub type WatermarkCacheStateTable<S> = StateTableInner<S, BasicSerde, false, true>;
172pub type WatermarkCacheParameterizedStateTable<S, const USE_WATERMARK_CACHE: bool> =
173 StateTableInner<S, BasicSerde, false, USE_WATERMARK_CACHE>;
174
175impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
177 StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
178where
179 S: StateStore,
180 SD: ValueRowSerde,
181{
182 pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
185 self.row_store
186 .init(epoch, self.distribution.vnodes())
187 .await?;
188 assert_eq!(None, self.epoch.replace(epoch), "should not init for twice");
189 Ok(())
190 }
191
192 pub async fn try_wait_committed_epoch(&self, prev_epoch: u64) -> StorageResult<()> {
193 self.store
194 .try_wait_epoch(
195 HummockReadEpoch::Committed(prev_epoch),
196 TryWaitEpochOptions {
197 table_id: self.table_id,
198 },
199 )
200 .await
201 }
202
203 pub fn state_store(&self) -> &S {
204 &self.store
205 }
206}
207
208fn consistent_old_value_op(
209 row_serde: Arc<impl ValueRowSerde>,
210 is_log_store: bool,
211) -> OpConsistencyLevel {
212 OpConsistencyLevel::ConsistentOldValue {
213 check_old_value: Arc::new(move |first: &Bytes, second: &Bytes| {
214 if first == second {
215 return true;
216 }
217 let first = match row_serde.deserialize(first) {
218 Ok(rows) => rows,
219 Err(e) => {
220 error!(error = %e.as_report(), value = ?first, "fail to deserialize serialized value");
221 return false;
222 }
223 };
224 let second = match row_serde.deserialize(second) {
225 Ok(rows) => rows,
226 Err(e) => {
227 error!(error = %e.as_report(), value = ?second, "fail to deserialize serialized value");
228 return false;
229 }
230 };
231 if first != second {
232 error!(first = ?first, second = ?second, "sanity check fail");
233 false
234 } else {
235 true
236 }
237 }),
238 is_log_store,
239 }
240}
241
242macro_rules! dispatch_value_indices {
243 ($value_indices:expr, [$($row_var_name:ident),+], $body:expr) => {
244 if let Some(value_indices) = $value_indices {
245 $(
246 let $row_var_name = $row_var_name.project(value_indices);
247 )+
248 $body
249 } else {
250 $body
251 }
252 };
253}
254
255struct StateTableRowStore<LS: LocalStateStore, SD: ValueRowSerde> {
260 state_store: LS,
261 all_rows: Option<HashMap<VirtualNode, BTreeMap<Bytes, OwnedRow>>>,
262
263 table_id: TableId,
264 table_option: TableOption,
265 row_serde: Arc<SD>,
266 pk_serde: OrderedRowSerde,
268}
269
270impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
271 async fn may_reload_all_rows(&mut self, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
272 if let Some(rows) = &mut self.all_rows {
273 rows.clear();
274 let start_time = Instant::now();
275 *rows = try_join_all(vnode_bitmap.iter_vnodes().map(|vnode| {
276 let state_store = &self.state_store;
277 let retention_seconds = self.table_option.retention_seconds;
278 let row_serde = &self.row_serde;
279 async move {
280 let mut rows = BTreeMap::new();
281 let memcomparable_range_with_vnode =
282 prefixed_range_with_vnode::<Bytes>(.., vnode);
283 let stream = deserialize_keyed_row_stream::<Bytes>(
285 state_store
286 .iter(
287 memcomparable_range_with_vnode,
288 ReadOptions {
289 prefix_hint: None,
290 prefetch_options: Default::default(),
291 cache_policy: Default::default(),
292 retention_seconds,
293 },
294 )
295 .await?,
296 &**row_serde,
297 );
298 pin_mut!(stream);
299 while let Some((encoded_key, row)) = stream.try_next().await? {
300 let key = TableKey(encoded_key);
301 let (iter_vnode, key) = key.split_vnode_bytes();
302 assert_eq!(vnode, iter_vnode);
303 rows.try_insert(key, row).expect("non-duplicated");
304 }
305 Ok((vnode, rows)) as StreamExecutorResult<_>
306 }
307 }))
308 .await?
309 .into_iter()
310 .collect();
311 if !cfg!(debug_assertions) {
313 info!(table_id = %self.table_id, vnode_count = vnode_bitmap.count_ones(), duration = ?start_time.elapsed(),"finished reloading all rows");
314 }
315 }
316 Ok(())
317 }
318
319 async fn init(&mut self, epoch: EpochPair, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
320 self.state_store.init(InitOptions::new(epoch)).await?;
321 self.may_reload_all_rows(vnode_bitmap).await
322 }
323
324 async fn update_vnode_bitmap(
325 &mut self,
326 vnodes: Arc<Bitmap>,
327 ) -> StreamExecutorResult<Arc<Bitmap>> {
328 let prev_vnodes = self.state_store.update_vnode_bitmap(vnodes.clone()).await?;
329 self.may_reload_all_rows(&vnodes).await?;
330 Ok(prev_vnodes)
331 }
332
333 async fn try_flush(&mut self) -> StreamExecutorResult<()> {
334 self.state_store.try_flush().await?;
335 Ok(())
336 }
337
338 async fn seal_current_epoch(
339 &mut self,
340 next_epoch: u64,
341 table_watermarks: Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)>,
342 switch_consistent_op: Option<StateTableOpConsistencyLevel>,
343 ) -> StreamExecutorResult<()> {
344 if let Some((direction, watermarks, serde_type)) = &table_watermarks
345 && let Some(rows) = &mut self.all_rows
346 {
347 match serde_type {
348 WatermarkSerdeType::PkPrefix => {
349 for vnode_watermark in watermarks {
350 match direction {
351 WatermarkDirection::Ascending => {
352 for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
353 let rows = rows.get_mut(&vnode).expect("covered vnode");
354 *rows = rows.split_off(vnode_watermark.watermark());
356 }
357 }
358 WatermarkDirection::Descending => {
359 let split_off_key = next_key(vnode_watermark.watermark());
361 for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
362 let rows = rows.get_mut(&vnode).expect("covered vnode");
363 rows.split_off(split_off_key.as_slice());
366 }
367 }
368 }
369 }
370 }
371 WatermarkSerdeType::NonPkPrefix => {
372 warn!(table_id = %self.table_id, "table enabled preloading rows got disabled by written non pk prefix watermark");
373 self.all_rows = None;
374 }
375 }
376 }
377 self.state_store
378 .flush()
379 .instrument(tracing::info_span!("state_table_flush"))
380 .await?;
381 let switch_op_consistency_level =
382 switch_consistent_op.map(|new_consistency_level| match new_consistency_level {
383 StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
384 StateTableOpConsistencyLevel::ConsistentOldValue => {
385 consistent_old_value_op(self.row_serde.clone(), false)
386 }
387 StateTableOpConsistencyLevel::LogStoreEnabled => {
388 consistent_old_value_op(self.row_serde.clone(), true)
389 }
390 });
391 self.state_store.seal_current_epoch(
392 next_epoch,
393 SealCurrentEpochOptions {
394 table_watermarks,
395 switch_op_consistency_level,
396 },
397 );
398 Ok(())
399 }
400}
401
402#[derive(Eq, PartialEq, Copy, Clone, Debug)]
403pub enum StateTableOpConsistencyLevel {
404 Inconsistent,
406 ConsistentOldValue,
410 LogStoreEnabled,
413}
414
415pub struct StateTableBuilder<
416 'a,
417 S,
418 SD,
419 const IS_REPLICATED: bool,
420 const USE_WATERMARK_CACHE: bool,
421 PreloadAllRow,
422> {
423 table_catalog: &'a Table,
424 store: S,
425 vnodes: Option<Arc<Bitmap>>,
426 op_consistency_level: Option<StateTableOpConsistencyLevel>,
427 output_column_ids: Option<Vec<ColumnId>>,
428 preload_all_rows: PreloadAllRow,
429
430 _serde: PhantomData<SD>,
431}
432
433impl<
434 'a,
435 S: StateStore,
436 SD: ValueRowSerde,
437 const IS_REPLICATED: bool,
438 const USE_WATERMARK_CACHE: bool,
439> StateTableBuilder<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE, ()>
440{
441 pub fn new(table_catalog: &'a Table, store: S, vnodes: Option<Arc<Bitmap>>) -> Self {
442 Self {
443 table_catalog,
444 store,
445 vnodes,
446 op_consistency_level: None,
447 output_column_ids: None,
448 preload_all_rows: (),
449 _serde: Default::default(),
450 }
451 }
452
453 fn with_preload_all_rows(
454 self,
455 preload_all_rows: bool,
456 ) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE, bool> {
457 StateTableBuilder {
458 table_catalog: self.table_catalog,
459 store: self.store,
460 vnodes: self.vnodes,
461 op_consistency_level: self.op_consistency_level,
462 output_column_ids: self.output_column_ids,
463 preload_all_rows,
464 _serde: Default::default(),
465 }
466 }
467
468 pub fn enable_preload_all_rows_by_config(
469 self,
470 config: &StreamingConfig,
471 ) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE, bool> {
472 let developer = &config.developer;
473 let preload_all_rows = if developer.default_enable_mem_preload_state_table {
474 !developer
475 .mem_preload_state_table_ids_blacklist
476 .contains(&self.table_catalog.id)
477 } else {
478 developer
479 .mem_preload_state_table_ids_whitelist
480 .contains(&self.table_catalog.id)
481 };
482 self.with_preload_all_rows(preload_all_rows)
483 }
484
485 pub fn forbid_preload_all_rows(
486 self,
487 ) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE, bool> {
488 self.with_preload_all_rows(false)
489 }
490}
491
492impl<
493 'a,
494 S: StateStore,
495 SD: ValueRowSerde,
496 const IS_REPLICATED: bool,
497 const USE_WATERMARK_CACHE: bool,
498 PreloadAllRow,
499> StateTableBuilder<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE, PreloadAllRow>
500{
501 pub fn with_op_consistency_level(
502 mut self,
503 op_consistency_level: StateTableOpConsistencyLevel,
504 ) -> Self {
505 self.op_consistency_level = Some(op_consistency_level);
506 self
507 }
508
509 pub fn with_output_column_ids(mut self, output_column_ids: Vec<ColumnId>) -> Self {
510 self.output_column_ids = Some(output_column_ids);
511 self
512 }
513}
514
515impl<
516 'a,
517 S: StateStore,
518 SD: ValueRowSerde,
519 const IS_REPLICATED: bool,
520 const USE_WATERMARK_CACHE: bool,
521> StateTableBuilder<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE, bool>
522{
523 pub async fn build(self) -> StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE> {
524 let mut preload_all_rows = self.preload_all_rows;
525 if preload_all_rows
526 && let Err(e) =
527 risingwave_common::license::Feature::StateTableMemoryPreload.check_available()
528 {
529 warn!(table_id=self.table_catalog.id, e=%e.as_report(), "table configured to preload rows to memory but disabled by license");
530 preload_all_rows = false;
531 }
532 StateTableInner::from_table_catalog_inner(
533 self.table_catalog,
534 self.store,
535 self.vnodes,
536 self.op_consistency_level
537 .unwrap_or(StateTableOpConsistencyLevel::ConsistentOldValue),
538 self.output_column_ids.unwrap_or_default(),
539 preload_all_rows,
540 )
541 .await
542 }
543}
544
545impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
550 StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
551where
552 S: StateStore,
553 SD: ValueRowSerde,
554{
555 #[cfg(any(test, feature = "test"))]
559 pub async fn from_table_catalog(
560 table_catalog: &Table,
561 store: S,
562 vnodes: Option<Arc<Bitmap>>,
563 ) -> Self {
564 StateTableBuilder::new(table_catalog, store, vnodes)
565 .forbid_preload_all_rows()
566 .build()
567 .await
568 }
569
570 pub async fn from_table_catalog_inconsistent_op(
572 table_catalog: &Table,
573 store: S,
574 vnodes: Option<Arc<Bitmap>>,
575 ) -> Self {
576 StateTableBuilder::new(table_catalog, store, vnodes)
577 .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
578 .forbid_preload_all_rows()
579 .build()
580 .await
581 }
582
583 async fn from_table_catalog_inner(
585 table_catalog: &Table,
586 store: S,
587 vnodes: Option<Arc<Bitmap>>,
588 op_consistency_level: StateTableOpConsistencyLevel,
589 output_column_ids: Vec<ColumnId>,
590 preload_all_rows: bool,
591 ) -> Self {
592 let table_id = TableId::new(table_catalog.id);
593 let table_columns: Vec<ColumnDesc> = table_catalog
594 .columns
595 .iter()
596 .map(|col| col.column_desc.as_ref().unwrap().into())
597 .collect();
598 let data_types: Vec<DataType> = table_catalog
599 .columns
600 .iter()
601 .map(|col| {
602 col.get_column_desc()
603 .unwrap()
604 .get_column_type()
605 .unwrap()
606 .into()
607 })
608 .collect();
609 let order_types: Vec<OrderType> = table_catalog
610 .pk
611 .iter()
612 .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
613 .collect();
614 let dist_key_indices: Vec<usize> = table_catalog
615 .distribution_key
616 .iter()
617 .map(|dist_index| *dist_index as usize)
618 .collect();
619
620 let pk_indices = table_catalog
621 .pk
622 .iter()
623 .map(|col_order| col_order.column_index as usize)
624 .collect_vec();
625
626 let dist_key_in_pk_indices = if table_catalog.get_dist_key_in_pk().is_empty() {
628 get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices).unwrap()
629 } else {
630 table_catalog
631 .get_dist_key_in_pk()
632 .iter()
633 .map(|idx| *idx as usize)
634 .collect()
635 };
636
637 let vnode_col_idx_in_pk = table_catalog.vnode_col_index.as_ref().and_then(|idx| {
638 let vnode_col_idx = *idx as usize;
639 pk_indices.iter().position(|&i| vnode_col_idx == i)
640 });
641
642 let distribution =
643 TableDistribution::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk);
644 assert_eq!(
645 distribution.vnode_count(),
646 table_catalog.vnode_count(),
647 "vnode count mismatch, scanning table {} under wrong distribution?",
648 table_catalog.name,
649 );
650
651 let pk_data_types = pk_indices
652 .iter()
653 .map(|i| table_columns[*i].data_type.clone())
654 .collect();
655 let pk_serde = OrderedRowSerde::new(pk_data_types, order_types);
656
657 let input_value_indices = table_catalog
658 .value_indices
659 .iter()
660 .map(|val| *val as usize)
661 .collect_vec();
662
663 let no_shuffle_value_indices = (0..table_columns.len()).collect_vec();
664
665 let value_indices = match input_value_indices.len() == table_columns.len()
667 && input_value_indices == no_shuffle_value_indices
668 {
669 true => None,
670 false => Some(input_value_indices),
671 };
672 let prefix_hint_len = table_catalog.read_prefix_len_hint as usize;
673
674 let row_serde = Arc::new(SD::new(
675 Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
676 Arc::from(table_columns.clone().into_boxed_slice()),
677 ));
678
679 let state_table_op_consistency_level = op_consistency_level;
680 let op_consistency_level = match op_consistency_level {
681 StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
682 StateTableOpConsistencyLevel::ConsistentOldValue => {
683 consistent_old_value_op(row_serde.clone(), false)
684 }
685 StateTableOpConsistencyLevel::LogStoreEnabled => {
686 consistent_old_value_op(row_serde.clone(), true)
687 }
688 };
689
690 let table_option = TableOption::new(table_catalog.retention_seconds);
691 let new_local_options = if IS_REPLICATED {
692 NewLocalOptions::new_replicated(
693 table_id,
694 op_consistency_level,
695 table_option,
696 distribution.vnodes().clone(),
697 )
698 } else {
699 NewLocalOptions::new(
700 table_id,
701 op_consistency_level,
702 table_option,
703 distribution.vnodes().clone(),
704 true,
705 )
706 };
707 let local_state_store = store.new_local(new_local_options).await;
708
709 assert_eq!(
715 table_catalog.version.is_some(),
716 row_serde.kind().is_column_aware()
717 );
718
719 let watermark_serde = if pk_indices.is_empty() {
721 None
722 } else {
723 match table_catalog.clean_watermark_index_in_pk {
724 None => Some(pk_serde.index(0)),
725 Some(clean_watermark_index_in_pk) => {
726 Some(pk_serde.index(clean_watermark_index_in_pk as usize))
727 }
728 }
729 };
730 let max_watermark_of_vnodes = distribution
731 .vnodes()
732 .iter_vnodes()
733 .filter_map(|vnode| local_state_store.get_table_watermark(vnode))
734 .max();
735 let committed_watermark = if let Some(deser) = watermark_serde
736 && let Some(max_watermark) = max_watermark_of_vnodes
737 {
738 let deserialized = deser.deserialize(&max_watermark).ok().and_then(|row| {
739 assert!(row.len() == 1);
740 row[0].clone()
741 });
742 if deserialized.is_none() {
743 tracing::error!(
744 vnodes = ?distribution.vnodes(),
745 watermark = ?max_watermark,
746 "Failed to deserialize persisted watermark from state store.",
747 );
748 }
749 deserialized
750 } else {
751 None
752 };
753
754 let watermark_cache = if USE_WATERMARK_CACHE {
755 StateTableWatermarkCache::new(WATERMARK_CACHE_ENTRIES)
756 } else {
757 StateTableWatermarkCache::new(0)
758 };
759
760 let output_column_ids_to_input_idx = output_column_ids
762 .iter()
763 .enumerate()
764 .map(|(pos, id)| (*id, pos))
765 .collect::<HashMap<_, _>>();
766
767 let columns: Vec<ColumnDesc> = table_catalog
769 .columns
770 .iter()
771 .map(|c| c.column_desc.as_ref().unwrap().into())
772 .collect_vec();
773
774 let mut i2o_mapping = vec![None; columns.len()];
778 for (i, column) in columns.iter().enumerate() {
779 if let Some(pos) = output_column_ids_to_input_idx.get(&column.column_id) {
780 i2o_mapping[i] = Some(*pos);
781 }
782 }
783 let i2o_mapping = ColIndexMapping::new(i2o_mapping, output_column_ids.len());
785
786 let (_, output_indices) = find_columns_by_ids(&columns[..], &output_column_ids);
788
789 Self {
790 table_id,
791 row_store: StateTableRowStore {
792 all_rows: preload_all_rows.then(HashMap::new),
793 table_option,
794 state_store: local_state_store,
795 row_serde,
796 pk_serde: pk_serde.clone(),
797 table_id,
798 },
799 store,
800 epoch: None,
801 pk_serde,
802 pk_indices,
803 distribution,
804 prefix_hint_len,
805 value_indices,
806 pending_watermark: None,
807 committed_watermark,
808 watermark_cache,
809 data_types,
810 output_indices,
811 i2o_mapping,
812 op_consistency_level: state_table_op_consistency_level,
813 clean_watermark_index_in_pk: table_catalog.clean_watermark_index_in_pk,
814 on_post_commit: false,
815 }
816 }
817
818 pub fn get_data_types(&self) -> &[DataType] {
819 &self.data_types
820 }
821
822 pub fn table_id(&self) -> TableId {
823 self.table_id
824 }
825
826 fn compute_prefix_vnode(&self, pk_prefix: &impl Row) -> VirtualNode {
828 self.distribution
829 .try_compute_vnode_by_pk_prefix(pk_prefix)
830 .expect("For streaming, the given prefix must be enough to calculate the vnode")
831 }
832
833 pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode {
835 self.distribution.compute_vnode_by_pk(pk)
836 }
837
838 pub fn pk_indices(&self) -> &[usize] {
841 &self.pk_indices
842 }
843
844 pub fn pk_in_output_indices(&self) -> Option<Vec<usize>> {
848 assert!(IS_REPLICATED);
849 self.pk_indices
850 .iter()
851 .map(|&i| self.output_indices.iter().position(|&j| i == j))
852 .collect()
853 }
854
855 pub fn pk_serde(&self) -> &OrderedRowSerde {
856 &self.pk_serde
857 }
858
859 pub fn vnodes(&self) -> &Arc<Bitmap> {
860 self.distribution.vnodes()
861 }
862
863 pub fn value_indices(&self) -> &Option<Vec<usize>> {
864 &self.value_indices
865 }
866
867 pub fn is_consistent_op(&self) -> bool {
868 matches!(
869 self.op_consistency_level,
870 StateTableOpConsistencyLevel::ConsistentOldValue
871 | StateTableOpConsistencyLevel::LogStoreEnabled
872 )
873 }
874}
875
876impl<S, SD, const USE_WATERMARK_CACHE: bool> StateTableInner<S, SD, true, USE_WATERMARK_CACHE>
877where
878 S: StateStore,
879 SD: ValueRowSerde,
880{
881 pub async fn new_replicated(
883 table_catalog: &Table,
884 store: S,
885 vnodes: Option<Arc<Bitmap>>,
886 output_column_ids: Vec<ColumnId>,
887 ) -> Self {
888 StateTableBuilder::new(table_catalog, store, vnodes)
891 .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
892 .with_output_column_ids(output_column_ids)
893 .forbid_preload_all_rows()
894 .build()
895 .await
896 }
897}
898
899impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
901 StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
902where
903 S: StateStore,
904 SD: ValueRowSerde,
905{
906 pub async fn get_row(&self, pk: impl Row) -> StreamExecutorResult<Option<OwnedRow>> {
908 let (serialized_pk, prefix_hint) = self.serialize_pk_and_get_prefix_hint(&pk);
909 let row = self.row_store.get(serialized_pk, prefix_hint).await?;
910 match row {
911 Some(row) => {
912 if IS_REPLICATED {
913 let row = row.project(&self.output_indices);
916 Ok(Some(row.into_owned_row()))
917 } else {
918 Ok(Some(row))
919 }
920 }
921 None => Ok(None),
922 }
923 }
924
925 pub async fn exists(&self, pk: impl Row) -> StreamExecutorResult<bool> {
927 let (serialized_pk, prefix_hint) = self.serialize_pk_and_get_prefix_hint(&pk);
928 self.row_store.exists(serialized_pk, prefix_hint).await
929 }
930
931 fn serialize_pk(&self, pk: &impl Row) -> TableKey<Bytes> {
932 assert!(pk.len() <= self.pk_indices.len());
933 serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_vnode_by_pk(pk))
934 }
935
936 fn serialize_pk_and_get_prefix_hint(&self, pk: &impl Row) -> (TableKey<Bytes>, Option<Bytes>) {
937 let serialized_pk = self.serialize_pk(&pk);
938 let prefix_hint = if self.prefix_hint_len != 0 && self.prefix_hint_len == pk.len() {
939 Some(serialized_pk.slice(VirtualNode::SIZE..))
940 } else {
941 #[cfg(debug_assertions)]
942 if self.prefix_hint_len != 0 {
943 warn!(
944 "prefix_hint_len is not equal to pk.len(), may not be able to utilize bloom filter"
945 );
946 }
947 None
948 };
949 (serialized_pk, prefix_hint)
950 }
951}
952
953impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
954 async fn get(
955 &self,
956 key_bytes: TableKey<Bytes>,
957 prefix_hint: Option<Bytes>,
958 ) -> StreamExecutorResult<Option<OwnedRow>> {
959 if let Some(rows) = &self.all_rows {
960 let (vnode, key) = key_bytes.split_vnode();
961 return Ok(rows.get(&vnode).expect("covered vnode").get(key).cloned());
962 }
963 let read_options = ReadOptions {
964 prefix_hint,
965 retention_seconds: self.table_option.retention_seconds,
966 cache_policy: CachePolicy::Fill(Hint::Normal),
967 ..Default::default()
968 };
969
970 self.state_store
971 .on_key_value(key_bytes, read_options, move |_, value| {
972 let row = self.row_serde.deserialize(value)?;
973 Ok(OwnedRow::new(row))
974 })
975 .await
976 .map_err(Into::into)
977 }
978
979 async fn exists(
980 &self,
981 key_bytes: TableKey<Bytes>,
982 prefix_hint: Option<Bytes>,
983 ) -> StreamExecutorResult<bool> {
984 if let Some(rows) = &self.all_rows {
985 let (vnode, key) = key_bytes.split_vnode();
986 return Ok(rows.get(&vnode).expect("covered vnode").contains_key(key));
987 }
988 let read_options = ReadOptions {
989 prefix_hint,
990 retention_seconds: self.table_option.retention_seconds,
991 cache_policy: CachePolicy::Fill(Hint::Normal),
992 ..Default::default()
993 };
994 let result = self
995 .state_store
996 .on_key_value(key_bytes, read_options, move |_, _| Ok(()))
997 .await?;
998 Ok(result.is_some())
999 }
1000}
1001
1002#[must_use]
1017pub struct StateTablePostCommit<
1018 'a,
1019 S,
1020 SD = BasicSerde,
1021 const IS_REPLICATED: bool = false,
1022 const USE_WATERMARK_CACHE: bool = false,
1023> where
1024 S: StateStore,
1025 SD: ValueRowSerde,
1026{
1027 inner: &'a mut StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>,
1028}
1029
1030impl<'a, S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1031 StateTablePostCommit<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1032where
1033 S: StateStore,
1034 SD: ValueRowSerde,
1035{
1036 pub async fn post_yield_barrier(
1037 mut self,
1038 new_vnodes: Option<Arc<Bitmap>>,
1039 ) -> StreamExecutorResult<
1040 Option<(
1041 (
1042 Arc<Bitmap>,
1043 Arc<Bitmap>,
1044 &'a mut StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>,
1045 ),
1046 bool,
1047 )>,
1048 > {
1049 self.inner.on_post_commit = false;
1050 Ok(if let Some(new_vnodes) = new_vnodes {
1051 let (old_vnodes, cache_may_stale) =
1052 self.update_vnode_bitmap(new_vnodes.clone()).await?;
1053 Some(((new_vnodes, old_vnodes, self.inner), cache_may_stale))
1054 } else {
1055 None
1056 })
1057 }
1058
1059 pub fn inner(&self) -> &StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE> {
1060 &*self.inner
1061 }
1062
1063 async fn update_vnode_bitmap(
1065 &mut self,
1066 new_vnodes: Arc<Bitmap>,
1067 ) -> StreamExecutorResult<(Arc<Bitmap>, bool)> {
1068 let prev_vnodes = self
1069 .inner
1070 .row_store
1071 .update_vnode_bitmap(new_vnodes.clone())
1072 .await?;
1073 assert_eq!(
1074 &prev_vnodes,
1075 self.inner.vnodes(),
1076 "state table and state store vnode bitmap mismatches"
1077 );
1078
1079 if self.inner.distribution.is_singleton() {
1080 assert_eq!(
1081 &new_vnodes,
1082 self.inner.vnodes(),
1083 "should not update vnode bitmap for singleton table"
1084 );
1085 }
1086 assert_eq!(self.inner.vnodes().len(), new_vnodes.len());
1087
1088 let cache_may_stale = cache_may_stale(self.inner.vnodes(), &new_vnodes);
1089
1090 if cache_may_stale {
1091 self.inner.pending_watermark = None;
1092 if USE_WATERMARK_CACHE {
1093 self.inner.watermark_cache.clear();
1094 }
1095 }
1096
1097 Ok((
1098 self.inner.distribution.update_vnode_bitmap(new_vnodes),
1099 cache_may_stale,
1100 ))
1101 }
1102}
1103
1104impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1106 fn handle_mem_table_error(&self, e: StorageError) {
1107 let e = match e.into_inner() {
1108 ErrorKind::MemTable(e) => e,
1109 _ => unreachable!("should only get memtable error"),
1110 };
1111 match *e {
1112 MemTableError::InconsistentOperation { key, prev, new, .. } => {
1113 let (vnode, key) = deserialize_pk_with_vnode(&key, &self.pk_serde).unwrap();
1114 panic!(
1115 "mem-table operation inconsistent! table_id: {}, vnode: {}, key: {:?}, prev: {}, new: {}",
1116 self.table_id,
1117 vnode,
1118 &key,
1119 prev.debug_fmt(&*self.row_serde),
1120 new.debug_fmt(&*self.row_serde),
1121 )
1122 }
1123 }
1124 }
1125
1126 fn insert(&mut self, key: TableKey<Bytes>, value: impl Row) {
1127 insane_mode_discard_point!();
1128 let value_bytes = self.row_serde.serialize(&value).into();
1129 if let Some(rows) = &mut self.all_rows {
1130 let (vnode, key) = key.split_vnode_bytes();
1131 rows.get_mut(&vnode)
1132 .expect("covered vnode")
1133 .insert(key, value.into_owned_row());
1134 }
1135 self.state_store
1136 .insert(key, value_bytes, None)
1137 .unwrap_or_else(|e| self.handle_mem_table_error(e));
1138 }
1139
1140 fn delete(&mut self, key: TableKey<Bytes>, value: impl Row) {
1141 insane_mode_discard_point!();
1142 let value_bytes = self.row_serde.serialize(value).into();
1143 if let Some(rows) = &mut self.all_rows {
1144 let (vnode, key) = key.split_vnode();
1145 rows.get_mut(&vnode).expect("covered vnode").remove(key);
1146 }
1147 self.state_store
1148 .delete(key, value_bytes)
1149 .unwrap_or_else(|e| self.handle_mem_table_error(e));
1150 }
1151
1152 fn update(&mut self, key_bytes: TableKey<Bytes>, old_value: impl Row, new_value: impl Row) {
1153 insane_mode_discard_point!();
1154 let new_value_bytes = self.row_serde.serialize(&new_value).into();
1155 let old_value_bytes = self.row_serde.serialize(old_value).into();
1156 if let Some(rows) = &mut self.all_rows {
1157 let (vnode, key) = key_bytes.split_vnode_bytes();
1158 rows.get_mut(&vnode)
1159 .expect("covered vnode")
1160 .insert(key, new_value.into_owned_row());
1161 }
1162 self.state_store
1163 .insert(key_bytes, new_value_bytes, Some(old_value_bytes))
1164 .unwrap_or_else(|e| self.handle_mem_table_error(e));
1165 }
1166}
1167
1168impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1169 StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1170where
1171 S: StateStore,
1172 SD: ValueRowSerde,
1173{
1174 pub fn insert(&mut self, value: impl Row) {
1177 let pk_indices = &self.pk_indices;
1178 let pk = (&value).project(pk_indices);
1179 if USE_WATERMARK_CACHE {
1180 self.watermark_cache.insert(&pk);
1181 }
1182
1183 let key_bytes = self.serialize_pk(&pk);
1184 dispatch_value_indices!(&self.value_indices, [value], {
1185 self.row_store.insert(key_bytes, value)
1186 })
1187 }
1188
1189 pub fn delete(&mut self, old_value: impl Row) {
1192 let pk_indices = &self.pk_indices;
1193 let pk = (&old_value).project(pk_indices);
1194 if USE_WATERMARK_CACHE {
1195 self.watermark_cache.delete(&pk);
1196 }
1197
1198 let key_bytes = self.serialize_pk(&pk);
1199 dispatch_value_indices!(&self.value_indices, [old_value], {
1200 self.row_store.delete(key_bytes, old_value)
1201 })
1202 }
1203
1204 pub fn update(&mut self, old_value: impl Row, new_value: impl Row) {
1206 let old_pk = (&old_value).project(self.pk_indices());
1207 let new_pk = (&new_value).project(self.pk_indices());
1208 debug_assert!(
1209 Row::eq(&old_pk, new_pk),
1210 "pk should not change: {old_pk:?} vs {new_pk:?}. {}",
1211 self.table_id
1212 );
1213
1214 let key_bytes = self.serialize_pk(&new_pk);
1215 dispatch_value_indices!(&self.value_indices, [old_value, new_value], {
1216 self.row_store.update(key_bytes, old_value, new_value)
1217 })
1218 }
1219
1220 pub fn write_record(&mut self, record: Record<impl Row>) {
1222 match record {
1223 Record::Insert { new_row } => self.insert(new_row),
1224 Record::Delete { old_row } => self.delete(old_row),
1225 Record::Update { old_row, new_row } => self.update(old_row, new_row),
1226 }
1227 }
1228
1229 fn fill_non_output_indices(&self, chunk: StreamChunk) -> StreamChunk {
1230 fill_non_output_indices(&self.i2o_mapping, &self.data_types, chunk)
1231 }
1232
1233 #[allow(clippy::disallowed_methods)]
1236 pub fn write_chunk(&mut self, chunk: StreamChunk) {
1237 let chunk = if IS_REPLICATED {
1238 self.fill_non_output_indices(chunk)
1239 } else {
1240 chunk
1241 };
1242
1243 let vnodes = self
1244 .distribution
1245 .compute_chunk_vnode(&chunk, &self.pk_indices);
1246
1247 for (idx, optional_row) in chunk.rows_with_holes().enumerate() {
1248 let Some((op, row)) = optional_row else {
1249 continue;
1250 };
1251 let pk = row.project(&self.pk_indices);
1252 let vnode = vnodes[idx];
1253 let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, vnode);
1254 match op {
1255 Op::Insert | Op::UpdateInsert => {
1256 if USE_WATERMARK_CACHE {
1257 self.watermark_cache.insert(&pk);
1258 }
1259 dispatch_value_indices!(&self.value_indices, [row], {
1260 self.row_store.insert(key_bytes, row);
1261 });
1262 }
1263 Op::Delete | Op::UpdateDelete => {
1264 if USE_WATERMARK_CACHE {
1265 self.watermark_cache.delete(&pk);
1266 }
1267 dispatch_value_indices!(&self.value_indices, [row], {
1268 self.row_store.delete(key_bytes, row);
1269 });
1270 }
1271 }
1272 }
1273 }
1274
1275 pub fn update_watermark(&mut self, watermark: ScalarImpl) {
1281 trace!(table_id = %self.table_id, watermark = ?watermark, "update watermark");
1282 self.pending_watermark = Some(watermark);
1283 }
1284
1285 pub fn get_committed_watermark(&self) -> Option<&ScalarImpl> {
1288 self.committed_watermark.as_ref()
1289 }
1290
1291 pub async fn commit(
1292 &mut self,
1293 new_epoch: EpochPair,
1294 ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1295 {
1296 self.commit_inner(new_epoch, None).await
1297 }
1298
1299 #[cfg(test)]
1300 pub async fn commit_for_test(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
1301 self.commit_assert_no_update_vnode_bitmap(new_epoch).await
1302 }
1303
1304 pub async fn commit_assert_no_update_vnode_bitmap(
1305 &mut self,
1306 new_epoch: EpochPair,
1307 ) -> StreamExecutorResult<()> {
1308 let post_commit = self.commit_inner(new_epoch, None).await?;
1309 post_commit.post_yield_barrier(None).await?;
1310 Ok(())
1311 }
1312
1313 pub async fn commit_may_switch_consistent_op(
1314 &mut self,
1315 new_epoch: EpochPair,
1316 op_consistency_level: StateTableOpConsistencyLevel,
1317 ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1318 {
1319 if self.op_consistency_level != op_consistency_level {
1320 if !cfg!(debug_assertions) {
1322 info!(
1323 ?new_epoch,
1324 prev_op_consistency_level = ?self.op_consistency_level,
1325 ?op_consistency_level,
1326 table_id = %self.table_id,
1327 "switch to new op consistency level"
1328 );
1329 }
1330 self.commit_inner(new_epoch, Some(op_consistency_level))
1331 .await
1332 } else {
1333 self.commit_inner(new_epoch, None).await
1334 }
1335 }
1336
1337 async fn commit_inner(
1338 &mut self,
1339 new_epoch: EpochPair,
1340 switch_consistent_op: Option<StateTableOpConsistencyLevel>,
1341 ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1342 {
1343 assert!(!self.on_post_commit);
1344 assert_eq!(
1345 self.epoch.expect("should only be called after init").curr,
1346 new_epoch.prev
1347 );
1348 if let Some(new_consistency_level) = switch_consistent_op {
1349 assert_ne!(self.op_consistency_level, new_consistency_level);
1350 self.op_consistency_level = new_consistency_level;
1351 }
1352 trace!(
1353 table_id = %self.table_id,
1354 epoch = ?self.epoch,
1355 "commit state table"
1356 );
1357
1358 let table_watermarks = self.commit_pending_watermark();
1359 self.row_store
1360 .seal_current_epoch(new_epoch.curr, table_watermarks, switch_consistent_op)
1361 .await?;
1362 self.epoch = Some(new_epoch);
1363
1364 if USE_WATERMARK_CACHE
1366 && !self.watermark_cache.is_synced()
1367 && let Some(ref watermark) = self.committed_watermark
1368 {
1369 let range: (Bound<Once<Datum>>, Bound<Once<Datum>>) =
1370 (Included(once(Some(watermark.clone()))), Unbounded);
1371 let mut pks = Vec::with_capacity(self.watermark_cache.capacity());
1380 {
1381 let mut streams = vec![];
1382 for vnode in self.vnodes().iter_vnodes() {
1383 let stream = self
1384 .iter_keyed_row_with_vnode(vnode, &range, PrefetchOptions::default())
1385 .await?;
1386 streams.push(Box::pin(stream));
1387 }
1388 let merged_stream = merge_sort(streams);
1389 pin_mut!(merged_stream);
1390
1391 #[for_await]
1392 for entry in merged_stream.take(self.watermark_cache.capacity()) {
1393 let keyed_row = entry?;
1394 let pk = self.pk_serde.deserialize(keyed_row.key())?;
1395 if !pk.is_null_at(self.clean_watermark_index_in_pk.unwrap_or(0) as usize) {
1397 pks.push(pk);
1398 }
1399 }
1400 }
1401
1402 let mut filler = self.watermark_cache.begin_syncing();
1403 for pk in pks {
1404 filler.insert_unchecked(DefaultOrdered(pk), ());
1405 }
1406 filler.finish();
1407
1408 let n_cache_entries = self.watermark_cache.len();
1409 if n_cache_entries < self.watermark_cache.capacity() {
1410 self.watermark_cache.set_table_row_count(n_cache_entries);
1411 }
1412 }
1413
1414 self.on_post_commit = true;
1415 Ok(StateTablePostCommit { inner: self })
1416 }
1417
1418 fn commit_pending_watermark(
1420 &mut self,
1421 ) -> Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)> {
1422 let watermark = self.pending_watermark.take()?;
1423 trace!(table_id = %self.table_id, watermark = ?watermark, "state cleaning");
1424
1425 assert!(
1426 !self.pk_indices().is_empty(),
1427 "see pending watermark on empty pk"
1428 );
1429 let watermark_serializer = {
1430 match self.clean_watermark_index_in_pk {
1431 None => self.pk_serde.index(0),
1432 Some(clean_watermark_index_in_pk) => {
1433 self.pk_serde.index(clean_watermark_index_in_pk as usize)
1434 }
1435 }
1436 };
1437
1438 let watermark_type = match self.clean_watermark_index_in_pk {
1439 None => WatermarkSerdeType::PkPrefix,
1440 Some(clean_watermark_index_in_pk) => match clean_watermark_index_in_pk {
1441 0 => WatermarkSerdeType::PkPrefix,
1442 _ => WatermarkSerdeType::NonPkPrefix,
1443 },
1444 };
1445
1446 let should_clean_watermark = {
1447 {
1448 if USE_WATERMARK_CACHE && self.watermark_cache.is_synced() {
1449 if let Some(key) = self.watermark_cache.lowest_key() {
1450 watermark.as_scalar_ref_impl().default_cmp(&key).is_ge()
1451 } else {
1452 false
1457 }
1458 } else {
1459 true
1463 }
1464 }
1465 };
1466
1467 let watermark_suffix =
1468 serialize_pk(row::once(Some(watermark.clone())), &watermark_serializer);
1469
1470 let seal_watermark = if should_clean_watermark {
1472 trace!(table_id = %self.table_id, watermark = ?watermark_suffix, vnodes = ?{
1473 self.vnodes().iter_vnodes().collect_vec()
1474 }, "delete range");
1475
1476 let order_type = watermark_serializer.get_order_types().get(0).unwrap();
1477
1478 if order_type.is_ascending() {
1479 Some((
1480 WatermarkDirection::Ascending,
1481 VnodeWatermark::new(
1482 self.vnodes().clone(),
1483 Bytes::copy_from_slice(watermark_suffix.as_ref()),
1484 ),
1485 watermark_type,
1486 ))
1487 } else {
1488 Some((
1489 WatermarkDirection::Descending,
1490 VnodeWatermark::new(
1491 self.vnodes().clone(),
1492 Bytes::copy_from_slice(watermark_suffix.as_ref()),
1493 ),
1494 watermark_type,
1495 ))
1496 }
1497 } else {
1498 None
1499 };
1500 self.committed_watermark = Some(watermark);
1501
1502 if USE_WATERMARK_CACHE && seal_watermark.is_some() {
1510 self.watermark_cache.clear();
1511 }
1512
1513 seal_watermark.map(|(direction, watermark, is_non_pk_prefix)| {
1514 (direction, vec![watermark], is_non_pk_prefix)
1515 })
1516 }
1517
1518 pub async fn try_flush(&mut self) -> StreamExecutorResult<()> {
1519 self.row_store.try_flush().await?;
1520 Ok(())
1521 }
1522}
1523
1524pub trait RowStream<'a>: Stream<Item = StreamExecutorResult<OwnedRow>> + 'a {}
1526impl<'a, S: Stream<Item = StreamExecutorResult<OwnedRow>> + 'a> RowStream<'a> for S {}
1527
1528pub trait KeyedRowStream<'a>: Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a {}
1529impl<'a, S: Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a> KeyedRowStream<'a> for S {}
1530
1531pub trait PkRowStream<'a, K>: Stream<Item = StreamExecutorResult<(K, OwnedRow)>> + 'a {}
1532impl<'a, K, S: Stream<Item = StreamExecutorResult<(K, OwnedRow)>> + 'a> PkRowStream<'a, K> for S {}
1533
1534pub trait FromVnodeBytes {
1535 fn from_vnode_bytes(vnode: VirtualNode, bytes: &Bytes) -> Self;
1536}
1537
1538impl FromVnodeBytes for Bytes {
1539 fn from_vnode_bytes(vnode: VirtualNode, bytes: &Bytes) -> Self {
1540 prefix_slice_with_vnode(vnode, bytes)
1541 }
1542}
1543
1544impl FromVnodeBytes for () {
1545 fn from_vnode_bytes(_vnode: VirtualNode, _bytes: &Bytes) -> Self {}
1546}
1547
1548impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1550 StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1551where
1552 S: StateStore,
1553 SD: ValueRowSerde,
1554{
1555 pub async fn iter_with_vnode(
1558 &self,
1559
1560 vnode: VirtualNode,
1564 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1565 prefetch_options: PrefetchOptions,
1566 ) -> StreamExecutorResult<impl RowStream<'_>> {
1567 Ok(self
1568 .iter_kv_with_pk_range::<()>(pk_range, vnode, prefetch_options)
1569 .await?
1570 .map_ok(|(_, row)| row))
1571 }
1572
1573 pub async fn iter_keyed_row_with_vnode(
1574 &self,
1575 vnode: VirtualNode,
1576 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1577 prefetch_options: PrefetchOptions,
1578 ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1579 Ok(self
1580 .iter_kv_with_pk_range(pk_range, vnode, prefetch_options)
1581 .await?
1582 .map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)))
1583 }
1584
1585 pub async fn iter_with_vnode_and_output_indices(
1586 &self,
1587 vnode: VirtualNode,
1588 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1589 prefetch_options: PrefetchOptions,
1590 ) -> StreamExecutorResult<impl RowStream<'_>> {
1591 assert!(IS_REPLICATED);
1592 let stream = self
1593 .iter_with_vnode(vnode, pk_range, prefetch_options)
1594 .await?;
1595 Ok(stream.map(|row| row.map(|row| row.project(&self.output_indices).into_owned_row())))
1596 }
1597}
1598
1599impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1600 async fn iter_kv<K: CopyFromSlice + FromVnodeBytes>(
1605 &self,
1606 vnode: VirtualNode,
1607 (start, end): (Bound<Bytes>, Bound<Bytes>),
1608 prefix_hint: Option<Bytes>,
1609 prefetch_options: PrefetchOptions,
1610 ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1611 if let Some(rows) = &self.all_rows {
1612 return Ok(futures::future::Either::Left(futures::stream::iter(
1613 rows.get(&vnode)
1614 .expect("covered vnode")
1615 .range((start, end))
1616 .map(move |(key, value)| Ok((K::from_vnode_bytes(vnode, key), value.clone()))),
1617 )));
1618 }
1619 let read_options = ReadOptions {
1620 prefix_hint,
1621 retention_seconds: self.table_option.retention_seconds,
1622 prefetch_options,
1623 cache_policy: CachePolicy::Fill(Hint::Normal),
1624 };
1625
1626 Ok(futures::future::Either::Right(
1627 deserialize_keyed_row_stream(
1628 self.state_store
1629 .iter(prefixed_range_with_vnode((start, end), vnode), read_options)
1630 .await?,
1631 &*self.row_serde,
1632 ),
1633 ))
1634 }
1635
1636 async fn rev_iter_kv<K: CopyFromSlice + FromVnodeBytes>(
1637 &self,
1638 vnode: VirtualNode,
1639 (start, end): (Bound<Bytes>, Bound<Bytes>),
1640 prefix_hint: Option<Bytes>,
1641 prefetch_options: PrefetchOptions,
1642 ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1643 if let Some(rows) = &self.all_rows {
1644 return Ok(futures::future::Either::Left(futures::stream::iter(
1645 rows.get(&vnode)
1646 .expect("covered vnode")
1647 .range((start, end))
1648 .rev()
1649 .map(move |(key, value)| Ok((K::from_vnode_bytes(vnode, key), value.clone()))),
1650 )));
1651 }
1652 let read_options = ReadOptions {
1653 prefix_hint,
1654 retention_seconds: self.table_option.retention_seconds,
1655 prefetch_options,
1656 cache_policy: CachePolicy::Fill(Hint::Normal),
1657 };
1658
1659 Ok(futures::future::Either::Right(
1660 deserialize_keyed_row_stream(
1661 self.state_store
1662 .rev_iter(prefixed_range_with_vnode((start, end), vnode), read_options)
1663 .await?,
1664 &*self.row_serde,
1665 ),
1666 ))
1667 }
1668}
1669
1670impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1671 StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1672where
1673 S: StateStore,
1674 SD: ValueRowSerde,
1675{
1676 pub async fn iter_with_prefix(
1680 &self,
1681 pk_prefix: impl Row,
1682 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1683 prefetch_options: PrefetchOptions,
1684 ) -> StreamExecutorResult<impl RowStream<'_>> {
1685 let stream = self.iter_with_prefix_inner::<false, ()>(pk_prefix, sub_range, prefetch_options)
1686 .await?;
1687 Ok(stream.map_ok(|(_, row)| row))
1688 }
1689
1690 pub async fn get_from_one_row_table(&self) -> StreamExecutorResult<Option<OwnedRow>> {
1692 let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
1693 let stream = self
1694 .iter_with_prefix(row::empty(), sub_range, Default::default())
1695 .await?;
1696 pin_mut!(stream);
1697
1698 if let Some(res) = stream.next().await {
1699 let value = res?.into_owned_row();
1700 assert!(stream.next().await.is_none());
1701 Ok(Some(value))
1702 } else {
1703 Ok(None)
1704 }
1705 }
1706
1707 pub async fn get_from_one_value_table(&self) -> StreamExecutorResult<Option<ScalarImpl>> {
1712 Ok(self
1713 .get_from_one_row_table()
1714 .await?
1715 .and_then(|row| row[0].clone()))
1716 }
1717
1718 pub async fn iter_keyed_row_with_prefix(
1719 &self,
1720 pk_prefix: impl Row,
1721 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1722 prefetch_options: PrefetchOptions,
1723 ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1724 Ok(
1725 self.iter_with_prefix_inner::<false, Bytes>(pk_prefix, sub_range, prefetch_options)
1726 .await?.map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)),
1727 )
1728 }
1729
1730 pub async fn rev_iter_with_prefix(
1732 &self,
1733 pk_prefix: impl Row,
1734 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1735 prefetch_options: PrefetchOptions,
1736 ) -> StreamExecutorResult<impl RowStream<'_>> {
1737 Ok(
1738 self.iter_with_prefix_inner::<true, ()>(pk_prefix, sub_range, prefetch_options)
1739 .await?.map_ok(|(_, row)| row),
1740 )
1741 }
1742
1743 async fn iter_with_prefix_inner<const REVERSE: bool, K: CopyFromSlice + FromVnodeBytes>(
1744 &self,
1745 pk_prefix: impl Row,
1746 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1747 prefetch_options: PrefetchOptions,
1748 ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1749 let prefix_serializer = self.pk_serde.prefix(pk_prefix.len());
1750 let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
1751
1752 let vnode = self.compute_prefix_vnode(&pk_prefix);
1756
1757 let pk_prefix_indices = &self.pk_indices[..pk_prefix.len()];
1759 if self.prefix_hint_len != 0 {
1760 debug_assert_eq!(self.prefix_hint_len, pk_prefix.len());
1761 }
1762 let prefix_hint = {
1763 if self.prefix_hint_len == 0 || self.prefix_hint_len > pk_prefix.len() {
1764 None
1765 } else {
1766 let encoded_prefix_len = self
1767 .pk_serde
1768 .deserialize_prefix_len(&encoded_prefix, self.prefix_hint_len)?;
1769
1770 Some(Bytes::copy_from_slice(
1771 &encoded_prefix[..encoded_prefix_len],
1772 ))
1773 }
1774 };
1775
1776 trace!(
1777 table_id = %self.table_id(),
1778 ?prefix_hint, ?pk_prefix,
1779 ?pk_prefix_indices,
1780 iter_direction = if REVERSE { "reverse" } else { "forward" },
1781 "storage_iter_with_prefix"
1782 );
1783
1784 let memcomparable_range =
1785 prefix_and_sub_range_to_memcomparable(&self.pk_serde, sub_range, pk_prefix);
1786
1787 Ok(if REVERSE {
1788 futures::future::Either::Left(
1789 self.row_store
1790 .rev_iter_kv(vnode, memcomparable_range, prefix_hint, prefetch_options)
1791 .await?,
1792 )
1793 } else {
1794 futures::future::Either::Right(
1795 self.row_store
1796 .iter_kv(vnode, memcomparable_range, prefix_hint, prefetch_options)
1797 .await?,
1798 )
1799 })
1800 }
1801
1802 async fn iter_kv_with_pk_range<'a, K: CopyFromSlice + FromVnodeBytes>(
1805 &'a self,
1806 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1807 vnode: VirtualNode,
1811 prefetch_options: PrefetchOptions,
1812 ) -> StreamExecutorResult<impl PkRowStream<'a, K>> {
1813 let memcomparable_range = prefix_range_to_memcomparable(&self.pk_serde, pk_range);
1814
1815 self.row_store
1817 .iter_kv(vnode, memcomparable_range, None, prefetch_options)
1818 .await
1819 }
1820
1821 #[cfg(test)]
1822 pub fn get_watermark_cache(&self) -> &StateTableWatermarkCache {
1823 &self.watermark_cache
1824 }
1825}
1826
1827fn deserialize_keyed_row_stream<'a, K: CopyFromSlice>(
1828 iter: impl StateStoreIter + 'a,
1829 deserializer: &'a impl ValueRowSerde,
1830) -> impl PkRowStream<'a, K> {
1831 iter.into_stream(move |(key, value)| {
1832 Ok((
1833 K::copy_from_slice(key.user_key.table_key.as_ref()),
1834 deserializer.deserialize(value).map(OwnedRow::new)?,
1835 ))
1836 })
1837 .map_err(Into::into)
1838}
1839
1840pub fn prefix_range_to_memcomparable(
1841 pk_serde: &OrderedRowSerde,
1842 range: &(Bound<impl Row>, Bound<impl Row>),
1843) -> (Bound<Bytes>, Bound<Bytes>) {
1844 (
1845 start_range_to_memcomparable(pk_serde, &range.0),
1846 end_range_to_memcomparable(pk_serde, &range.1, None),
1847 )
1848}
1849
1850fn prefix_and_sub_range_to_memcomparable(
1851 pk_serde: &OrderedRowSerde,
1852 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1853 pk_prefix: impl Row,
1854) -> (Bound<Bytes>, Bound<Bytes>) {
1855 let (range_start, range_end) = sub_range;
1856 let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1857 let serialized_pk_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
1858 let start_range = match range_start {
1859 Included(start_range) => Bound::Included(Either::Left((&pk_prefix).chain(start_range))),
1860 Excluded(start_range) => Bound::Excluded(Either::Left((&pk_prefix).chain(start_range))),
1861 Unbounded => Bound::Included(Either::Right(&pk_prefix)),
1862 };
1863 let end_range = match range_end {
1864 Included(end_range) => Bound::Included((&pk_prefix).chain(end_range)),
1865 Excluded(end_range) => Bound::Excluded((&pk_prefix).chain(end_range)),
1866 Unbounded => Unbounded,
1867 };
1868 (
1869 start_range_to_memcomparable(pk_serde, &start_range),
1870 end_range_to_memcomparable(pk_serde, &end_range, Some(serialized_pk_prefix)),
1871 )
1872}
1873
1874fn start_range_to_memcomparable<R: Row>(
1875 pk_serde: &OrderedRowSerde,
1876 bound: &Bound<R>,
1877) -> Bound<Bytes> {
1878 let serialize_pk_prefix = |pk_prefix: &R| {
1879 let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1880 serialize_pk(pk_prefix, &prefix_serializer)
1881 };
1882 match bound {
1883 Unbounded => Unbounded,
1884 Included(r) => {
1885 let serialized = serialize_pk_prefix(r);
1886
1887 Included(serialized)
1888 }
1889 Excluded(r) => {
1890 let serialized = serialize_pk_prefix(r);
1891
1892 start_bound_of_excluded_prefix(&serialized)
1893 }
1894 }
1895}
1896
1897fn end_range_to_memcomparable<R: Row>(
1898 pk_serde: &OrderedRowSerde,
1899 bound: &Bound<R>,
1900 serialized_pk_prefix: Option<Bytes>,
1901) -> Bound<Bytes> {
1902 let serialize_pk_prefix = |pk_prefix: &R| {
1903 let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1904 serialize_pk(pk_prefix, &prefix_serializer)
1905 };
1906 match bound {
1907 Unbounded => match serialized_pk_prefix {
1908 Some(serialized_pk_prefix) => end_bound_of_prefix(&serialized_pk_prefix),
1909 None => Unbounded,
1910 },
1911 Included(r) => {
1912 let serialized = serialize_pk_prefix(r);
1913
1914 end_bound_of_prefix(&serialized)
1915 }
1916 Excluded(r) => {
1917 let serialized = serialize_pk_prefix(r);
1918 Excluded(serialized)
1919 }
1920 }
1921}
1922
1923fn fill_non_output_indices(
1924 i2o_mapping: &ColIndexMapping,
1925 data_types: &[DataType],
1926 chunk: StreamChunk,
1927) -> StreamChunk {
1928 let cardinality = chunk.cardinality();
1929 let (ops, columns, vis) = chunk.into_inner();
1930 let mut full_columns = Vec::with_capacity(data_types.len());
1931 for (i, data_type) in data_types.iter().enumerate() {
1932 if let Some(j) = i2o_mapping.try_map(i) {
1933 full_columns.push(columns[j].clone());
1934 } else {
1935 let mut column_builder = ArrayImplBuilder::with_type(cardinality, data_type.clone());
1936 column_builder.append_n_null(cardinality);
1937 let column: ArrayRef = column_builder.finish().into();
1938 full_columns.push(column)
1939 }
1940 }
1941 let data_chunk = DataChunk::new(full_columns, vis);
1942 StreamChunk::from_parts(ops, data_chunk)
1943}
1944
1945#[cfg(test)]
1946mod tests {
1947 use std::fmt::Debug;
1948
1949 use expect_test::{Expect, expect};
1950
1951 use super::*;
1952
1953 fn check(actual: impl Debug, expect: Expect) {
1954 let actual = format!("{:#?}", actual);
1955 expect.assert_eq(&actual);
1956 }
1957
1958 #[test]
1959 fn test_fill_non_output_indices() {
1960 let data_types = vec![DataType::Int32, DataType::Int32, DataType::Int32];
1961 let replicated_chunk = [OwnedRow::new(vec![
1962 Some(222_i32.into()),
1963 Some(2_i32.into()),
1964 ])];
1965 let replicated_chunk = StreamChunk::from_parts(
1966 vec![Op::Insert],
1967 DataChunk::from_rows(&replicated_chunk, &[DataType::Int32, DataType::Int32]),
1968 );
1969 let i2o_mapping = ColIndexMapping::new(vec![Some(1), None, Some(0)], 2);
1970 let filled_chunk = fill_non_output_indices(&i2o_mapping, &data_types, replicated_chunk);
1971 check(
1972 filled_chunk,
1973 expect![[r#"
1974 StreamChunk { cardinality: 1, capacity: 1, data:
1975 +---+---+---+-----+
1976 | + | 2 | | 222 |
1977 +---+---+---+-----+
1978 }"#]],
1979 );
1980 }
1981}