1use std::collections::{BTreeMap, HashMap};
16use std::marker::PhantomData;
17use std::ops::Bound;
18use std::ops::Bound::*;
19use std::sync::Arc;
20use std::time::Instant;
21
22use bytes::Bytes;
23use either::Either;
24use foyer::Hint;
25use futures::future::try_join_all;
26use futures::{Stream, StreamExt, TryStreamExt, pin_mut};
27use futures_async_stream::for_await;
28use itertools::Itertools;
29use risingwave_common::array::stream_record::Record;
30use risingwave_common::array::{ArrayImplBuilder, ArrayRef, DataChunk, Op, StreamChunk};
31use risingwave_common::bitmap::Bitmap;
32use risingwave_common::catalog::{
33 ColumnDesc, ColumnId, TableId, TableOption, get_dist_key_in_pk_indices,
34};
35use risingwave_common::config::StreamingConfig;
36use risingwave_common::hash::{VirtualNode, VnodeBitmapExt, VnodeCountCompat};
37use risingwave_common::row::{self, Once, OwnedRow, Row, RowExt, once};
38use risingwave_common::types::{DataType, Datum, DefaultOrd, DefaultOrdered, ScalarImpl};
39use risingwave_common::util::column_index_mapping::ColIndexMapping;
40use risingwave_common::util::epoch::EpochPair;
41use risingwave_common::util::row_serde::OrderedRowSerde;
42use risingwave_common::util::sort_util::OrderType;
43use risingwave_common::util::value_encoding::BasicSerde;
44use risingwave_hummock_sdk::HummockReadEpoch;
45use risingwave_hummock_sdk::key::{
46 CopyFromSlice, TableKey, end_bound_of_prefix, next_key, prefix_slice_with_vnode,
47 prefixed_range_with_vnode, start_bound_of_excluded_prefix,
48};
49use risingwave_hummock_sdk::table_watermark::{
50 VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
51};
52use risingwave_pb::catalog::Table;
53use risingwave_storage::StateStore;
54use risingwave_storage::error::{ErrorKind, StorageError, StorageResult};
55use risingwave_storage::hummock::CachePolicy;
56use risingwave_storage::mem_table::MemTableError;
57use risingwave_storage::row_serde::find_columns_by_ids;
58use risingwave_storage::row_serde::row_serde_util::{
59 deserialize_pk_with_vnode, serialize_pk, serialize_pk_with_vnode,
60};
61use risingwave_storage::row_serde::value_serde::ValueRowSerde;
62use risingwave_storage::store::*;
63use risingwave_storage::table::merge_sort::merge_sort;
64use risingwave_storage::table::{KeyedRow, TableDistribution};
65use thiserror_ext::AsReport;
66use tracing::{Instrument, trace};
67
68use crate::cache::cache_may_stale;
69use crate::common::state_cache::{StateCache, StateCacheFiller};
70use crate::common::table::state_table_cache::StateTableWatermarkCache;
71use crate::executor::StreamExecutorResult;
72
73const WATERMARK_CACHE_ENTRIES: usize = 16;
77
78macro_rules! insane_mode_discard_point {
81 () => {{
82 use rand::Rng;
83 if crate::consistency::insane() && rand::rng().random_bool(0.3) {
84 return;
85 }
86 }};
87}
88
89pub struct StateTableInner<
92 S,
93 SD = BasicSerde,
94 const IS_REPLICATED: bool = false,
95 const USE_WATERMARK_CACHE: bool = false,
96> where
97 S: StateStore,
98 SD: ValueRowSerde,
99{
100 table_id: TableId,
102
103 row_store: StateTableRowStore<S::Local, SD>,
105
106 store: S,
108
109 epoch: Option<EpochPair>,
111
112 pk_serde: OrderedRowSerde,
114
115 pk_indices: Vec<usize>,
119
120 distribution: TableDistribution,
126
127 prefix_hint_len: usize,
128
129 value_indices: Option<Vec<usize>>,
130
131 pending_watermark: Option<ScalarImpl>,
133 committed_watermark: Option<ScalarImpl>,
135 watermark_cache: StateTableWatermarkCache,
137
138 data_types: Vec<DataType>,
141
142 i2o_mapping: ColIndexMapping,
148
149 output_indices: Vec<usize>,
154
155 op_consistency_level: StateTableOpConsistencyLevel,
156
157 clean_watermark_index_in_pk: Option<i32>,
158
159 on_post_commit: bool,
162}
163
164pub type StateTable<S> = StateTableInner<S, BasicSerde>;
166pub type ReplicatedStateTable<S, SD> = StateTableInner<S, SD, true>;
169pub type WatermarkCacheStateTable<S> = StateTableInner<S, BasicSerde, false, true>;
172pub type WatermarkCacheParameterizedStateTable<S, const USE_WATERMARK_CACHE: bool> =
173 StateTableInner<S, BasicSerde, false, USE_WATERMARK_CACHE>;
174
175impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
177 StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
178where
179 S: StateStore,
180 SD: ValueRowSerde,
181{
182 pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
185 self.row_store
186 .init(epoch, self.distribution.vnodes())
187 .await?;
188 assert_eq!(None, self.epoch.replace(epoch), "should not init for twice");
189 Ok(())
190 }
191
192 pub async fn try_wait_committed_epoch(&self, prev_epoch: u64) -> StorageResult<()> {
193 self.store
194 .try_wait_epoch(
195 HummockReadEpoch::Committed(prev_epoch),
196 TryWaitEpochOptions {
197 table_id: self.table_id,
198 },
199 )
200 .await
201 }
202
203 pub fn state_store(&self) -> &S {
204 &self.store
205 }
206}
207
208fn consistent_old_value_op(
209 row_serde: Arc<impl ValueRowSerde>,
210 is_log_store: bool,
211) -> OpConsistencyLevel {
212 OpConsistencyLevel::ConsistentOldValue {
213 check_old_value: Arc::new(move |first: &Bytes, second: &Bytes| {
214 if first == second {
215 return true;
216 }
217 let first = match row_serde.deserialize(first) {
218 Ok(rows) => rows,
219 Err(e) => {
220 error!(error = %e.as_report(), value = ?first, "fail to deserialize serialized value");
221 return false;
222 }
223 };
224 let second = match row_serde.deserialize(second) {
225 Ok(rows) => rows,
226 Err(e) => {
227 error!(error = %e.as_report(), value = ?second, "fail to deserialize serialized value");
228 return false;
229 }
230 };
231 if first != second {
232 error!(first = ?first, second = ?second, "sanity check fail");
233 false
234 } else {
235 true
236 }
237 }),
238 is_log_store,
239 }
240}
241
242macro_rules! dispatch_value_indices {
243 ($value_indices:expr, [$($row_var_name:ident),+], $body:expr) => {
244 if let Some(value_indices) = $value_indices {
245 $(
246 let $row_var_name = $row_var_name.project(value_indices);
247 )+
248 $body
249 } else {
250 $body
251 }
252 };
253}
254
255struct StateTableRowStore<LS: LocalStateStore, SD: ValueRowSerde> {
260 state_store: LS,
261 all_rows: Option<HashMap<VirtualNode, BTreeMap<Bytes, OwnedRow>>>,
262
263 table_id: TableId,
264 table_option: TableOption,
265 row_serde: Arc<SD>,
266 pk_serde: OrderedRowSerde,
268}
269
270impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
271 async fn may_reload_all_rows(&mut self, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
272 if let Some(rows) = &mut self.all_rows {
273 rows.clear();
274 let start_time = Instant::now();
275 *rows = try_join_all(vnode_bitmap.iter_vnodes().map(|vnode| {
276 let state_store = &self.state_store;
277 let retention_seconds = self.table_option.retention_seconds;
278 let row_serde = &self.row_serde;
279 async move {
280 let mut rows = BTreeMap::new();
281 let memcomparable_range_with_vnode =
282 prefixed_range_with_vnode::<Bytes>(.., vnode);
283 let stream = deserialize_keyed_row_stream::<Bytes>(
285 state_store
286 .iter(
287 memcomparable_range_with_vnode,
288 ReadOptions {
289 prefix_hint: None,
290 prefetch_options: Default::default(),
291 cache_policy: Default::default(),
292 retention_seconds,
293 },
294 )
295 .await?,
296 &**row_serde,
297 );
298 pin_mut!(stream);
299 while let Some((encoded_key, row)) = stream.try_next().await? {
300 let key = TableKey(encoded_key);
301 let (iter_vnode, key) = key.split_vnode_bytes();
302 assert_eq!(vnode, iter_vnode);
303 rows.try_insert(key, row).expect("non-duplicated");
304 }
305 Ok((vnode, rows)) as StreamExecutorResult<_>
306 }
307 }))
308 .await?
309 .into_iter()
310 .collect();
311 if !cfg!(debug_assertions) {
313 info!(table_id = %self.table_id, vnode_count = vnode_bitmap.count_ones(), duration = ?start_time.elapsed(),"finished reloading all rows");
314 }
315 }
316 Ok(())
317 }
318
319 async fn init(&mut self, epoch: EpochPair, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
320 self.state_store.init(InitOptions::new(epoch)).await?;
321 self.may_reload_all_rows(vnode_bitmap).await
322 }
323
324 async fn update_vnode_bitmap(
325 &mut self,
326 vnodes: Arc<Bitmap>,
327 ) -> StreamExecutorResult<Arc<Bitmap>> {
328 let prev_vnodes = self.state_store.update_vnode_bitmap(vnodes.clone()).await?;
329 self.may_reload_all_rows(&vnodes).await?;
330 Ok(prev_vnodes)
331 }
332
333 async fn try_flush(&mut self) -> StreamExecutorResult<()> {
334 self.state_store.try_flush().await?;
335 Ok(())
336 }
337
338 async fn seal_current_epoch(
339 &mut self,
340 next_epoch: u64,
341 table_watermarks: Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)>,
342 switch_consistent_op: Option<StateTableOpConsistencyLevel>,
343 ) -> StreamExecutorResult<()> {
344 if let Some((direction, watermarks, serde_type)) = &table_watermarks
345 && let Some(rows) = &mut self.all_rows
346 {
347 match serde_type {
348 WatermarkSerdeType::PkPrefix => {
349 for vnode_watermark in watermarks {
350 match direction {
351 WatermarkDirection::Ascending => {
352 for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
353 let rows = rows.get_mut(&vnode).expect("covered vnode");
354 *rows = rows.split_off(vnode_watermark.watermark());
356 }
357 }
358 WatermarkDirection::Descending => {
359 let split_off_key = next_key(vnode_watermark.watermark());
361 for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
362 let rows = rows.get_mut(&vnode).expect("covered vnode");
363 rows.split_off(split_off_key.as_slice());
366 }
367 }
368 }
369 }
370 }
371 WatermarkSerdeType::NonPkPrefix => {
372 warn!(table_id = %self.table_id, "table enabled preloading rows got disabled by written non pk prefix watermark");
373 self.all_rows = None;
374 }
375 }
376 }
377 self.state_store
378 .flush()
379 .instrument(tracing::info_span!("state_table_flush"))
380 .await?;
381 let switch_op_consistency_level =
382 switch_consistent_op.map(|new_consistency_level| match new_consistency_level {
383 StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
384 StateTableOpConsistencyLevel::ConsistentOldValue => {
385 consistent_old_value_op(self.row_serde.clone(), false)
386 }
387 StateTableOpConsistencyLevel::LogStoreEnabled => {
388 consistent_old_value_op(self.row_serde.clone(), true)
389 }
390 });
391 self.state_store.seal_current_epoch(
392 next_epoch,
393 SealCurrentEpochOptions {
394 table_watermarks,
395 switch_op_consistency_level,
396 },
397 );
398 Ok(())
399 }
400}
401
402#[derive(Eq, PartialEq, Copy, Clone, Debug)]
403pub enum StateTableOpConsistencyLevel {
404 Inconsistent,
406 ConsistentOldValue,
410 LogStoreEnabled,
413}
414
415pub struct StateTableBuilder<
416 'a,
417 S,
418 SD,
419 const IS_REPLICATED: bool,
420 const USE_WATERMARK_CACHE: bool,
421 PreloadAllRow,
422> {
423 table_catalog: &'a Table,
424 store: S,
425 vnodes: Option<Arc<Bitmap>>,
426 op_consistency_level: Option<StateTableOpConsistencyLevel>,
427 output_column_ids: Option<Vec<ColumnId>>,
428 preload_all_rows: PreloadAllRow,
429
430 _serde: PhantomData<SD>,
431}
432
433impl<
434 'a,
435 S: StateStore,
436 SD: ValueRowSerde,
437 const IS_REPLICATED: bool,
438 const USE_WATERMARK_CACHE: bool,
439> StateTableBuilder<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE, ()>
440{
441 pub fn new(table_catalog: &'a Table, store: S, vnodes: Option<Arc<Bitmap>>) -> Self {
442 Self {
443 table_catalog,
444 store,
445 vnodes,
446 op_consistency_level: None,
447 output_column_ids: None,
448 preload_all_rows: (),
449 _serde: Default::default(),
450 }
451 }
452
453 fn with_preload_all_rows(
454 self,
455 preload_all_rows: bool,
456 ) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE, bool> {
457 StateTableBuilder {
458 table_catalog: self.table_catalog,
459 store: self.store,
460 vnodes: self.vnodes,
461 op_consistency_level: self.op_consistency_level,
462 output_column_ids: self.output_column_ids,
463 preload_all_rows,
464 _serde: Default::default(),
465 }
466 }
467
468 pub fn enable_preload_all_rows_by_config(
469 self,
470 config: &StreamingConfig,
471 ) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE, bool> {
472 let developer = &config.developer;
473 let preload_all_rows = if developer.default_enable_mem_preload_state_table {
474 !developer
475 .mem_preload_state_table_ids_blacklist
476 .contains(&self.table_catalog.id)
477 } else {
478 developer
479 .mem_preload_state_table_ids_whitelist
480 .contains(&self.table_catalog.id)
481 };
482 self.with_preload_all_rows(preload_all_rows)
483 }
484
485 pub fn forbid_preload_all_rows(
486 self,
487 ) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE, bool> {
488 self.with_preload_all_rows(false)
489 }
490}
491
492impl<
493 'a,
494 S: StateStore,
495 SD: ValueRowSerde,
496 const IS_REPLICATED: bool,
497 const USE_WATERMARK_CACHE: bool,
498 PreloadAllRow,
499> StateTableBuilder<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE, PreloadAllRow>
500{
501 pub fn with_op_consistency_level(
502 mut self,
503 op_consistency_level: StateTableOpConsistencyLevel,
504 ) -> Self {
505 self.op_consistency_level = Some(op_consistency_level);
506 self
507 }
508
509 pub fn with_output_column_ids(mut self, output_column_ids: Vec<ColumnId>) -> Self {
510 self.output_column_ids = Some(output_column_ids);
511 self
512 }
513}
514
515impl<
516 'a,
517 S: StateStore,
518 SD: ValueRowSerde,
519 const IS_REPLICATED: bool,
520 const USE_WATERMARK_CACHE: bool,
521> StateTableBuilder<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE, bool>
522{
523 pub async fn build(self) -> StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE> {
524 let mut preload_all_rows = self.preload_all_rows;
525 if preload_all_rows
526 && let Err(e) =
527 risingwave_common::license::Feature::StateTableMemoryPreload.check_available()
528 {
529 warn!(table_id=self.table_catalog.id, e=%e.as_report(), "table configured to preload rows to memory but disabled by license");
530 preload_all_rows = false;
531 }
532 StateTableInner::from_table_catalog_inner(
533 self.table_catalog,
534 self.store,
535 self.vnodes,
536 self.op_consistency_level
537 .unwrap_or(StateTableOpConsistencyLevel::ConsistentOldValue),
538 self.output_column_ids.unwrap_or_default(),
539 preload_all_rows,
540 )
541 .await
542 }
543}
544
545impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
550 StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
551where
552 S: StateStore,
553 SD: ValueRowSerde,
554{
555 #[cfg(any(test, feature = "test"))]
559 pub async fn from_table_catalog(
560 table_catalog: &Table,
561 store: S,
562 vnodes: Option<Arc<Bitmap>>,
563 ) -> Self {
564 StateTableBuilder::new(table_catalog, store, vnodes)
565 .forbid_preload_all_rows()
566 .build()
567 .await
568 }
569
570 #[cfg(any(test, feature = "test"))]
572 pub async fn from_table_catalog_inconsistent_op(
573 table_catalog: &Table,
574 store: S,
575 vnodes: Option<Arc<Bitmap>>,
576 ) -> Self {
577 StateTableBuilder::new(table_catalog, store, vnodes)
578 .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
579 .forbid_preload_all_rows()
580 .build()
581 .await
582 }
583
584 async fn from_table_catalog_inner(
586 table_catalog: &Table,
587 store: S,
588 vnodes: Option<Arc<Bitmap>>,
589 op_consistency_level: StateTableOpConsistencyLevel,
590 output_column_ids: Vec<ColumnId>,
591 preload_all_rows: bool,
592 ) -> Self {
593 let table_id = TableId::new(table_catalog.id);
594 let table_columns: Vec<ColumnDesc> = table_catalog
595 .columns
596 .iter()
597 .map(|col| col.column_desc.as_ref().unwrap().into())
598 .collect();
599 let data_types: Vec<DataType> = table_catalog
600 .columns
601 .iter()
602 .map(|col| {
603 col.get_column_desc()
604 .unwrap()
605 .get_column_type()
606 .unwrap()
607 .into()
608 })
609 .collect();
610 let order_types: Vec<OrderType> = table_catalog
611 .pk
612 .iter()
613 .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
614 .collect();
615 let dist_key_indices: Vec<usize> = table_catalog
616 .distribution_key
617 .iter()
618 .map(|dist_index| *dist_index as usize)
619 .collect();
620
621 let pk_indices = table_catalog
622 .pk
623 .iter()
624 .map(|col_order| col_order.column_index as usize)
625 .collect_vec();
626
627 let dist_key_in_pk_indices = if table_catalog.get_dist_key_in_pk().is_empty() {
629 get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices).unwrap()
630 } else {
631 table_catalog
632 .get_dist_key_in_pk()
633 .iter()
634 .map(|idx| *idx as usize)
635 .collect()
636 };
637
638 let vnode_col_idx_in_pk = table_catalog.vnode_col_index.as_ref().and_then(|idx| {
639 let vnode_col_idx = *idx as usize;
640 pk_indices.iter().position(|&i| vnode_col_idx == i)
641 });
642
643 let distribution =
644 TableDistribution::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk);
645 assert_eq!(
646 distribution.vnode_count(),
647 table_catalog.vnode_count(),
648 "vnode count mismatch, scanning table {} under wrong distribution?",
649 table_catalog.name,
650 );
651
652 let pk_data_types = pk_indices
653 .iter()
654 .map(|i| table_columns[*i].data_type.clone())
655 .collect();
656 let pk_serde = OrderedRowSerde::new(pk_data_types, order_types);
657
658 let input_value_indices = table_catalog
659 .value_indices
660 .iter()
661 .map(|val| *val as usize)
662 .collect_vec();
663
664 let no_shuffle_value_indices = (0..table_columns.len()).collect_vec();
665
666 let value_indices = match input_value_indices.len() == table_columns.len()
668 && input_value_indices == no_shuffle_value_indices
669 {
670 true => None,
671 false => Some(input_value_indices),
672 };
673 let prefix_hint_len = table_catalog.read_prefix_len_hint as usize;
674
675 let row_serde = Arc::new(SD::new(
676 Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
677 Arc::from(table_columns.clone().into_boxed_slice()),
678 ));
679
680 let state_table_op_consistency_level = op_consistency_level;
681 let op_consistency_level = match op_consistency_level {
682 StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
683 StateTableOpConsistencyLevel::ConsistentOldValue => {
684 consistent_old_value_op(row_serde.clone(), false)
685 }
686 StateTableOpConsistencyLevel::LogStoreEnabled => {
687 consistent_old_value_op(row_serde.clone(), true)
688 }
689 };
690
691 let table_option = TableOption::new(table_catalog.retention_seconds);
692 let new_local_options = if IS_REPLICATED {
693 NewLocalOptions::new_replicated(
694 table_id,
695 op_consistency_level,
696 table_option,
697 distribution.vnodes().clone(),
698 )
699 } else {
700 NewLocalOptions::new(
701 table_id,
702 op_consistency_level,
703 table_option,
704 distribution.vnodes().clone(),
705 true,
706 )
707 };
708 let local_state_store = store.new_local(new_local_options).await;
709
710 assert_eq!(
716 table_catalog.version.is_some(),
717 row_serde.kind().is_column_aware()
718 );
719
720 let watermark_serde = if pk_indices.is_empty() {
722 None
723 } else {
724 match table_catalog.clean_watermark_index_in_pk {
725 None => Some(pk_serde.index(0)),
726 Some(clean_watermark_index_in_pk) => {
727 Some(pk_serde.index(clean_watermark_index_in_pk as usize))
728 }
729 }
730 };
731 let max_watermark_of_vnodes = distribution
732 .vnodes()
733 .iter_vnodes()
734 .filter_map(|vnode| local_state_store.get_table_watermark(vnode))
735 .max();
736 let committed_watermark = if let Some(deser) = watermark_serde
737 && let Some(max_watermark) = max_watermark_of_vnodes
738 {
739 let deserialized = deser.deserialize(&max_watermark).ok().and_then(|row| {
740 assert!(row.len() == 1);
741 row[0].clone()
742 });
743 if deserialized.is_none() {
744 tracing::error!(
745 vnodes = ?distribution.vnodes(),
746 watermark = ?max_watermark,
747 "Failed to deserialize persisted watermark from state store.",
748 );
749 }
750 deserialized
751 } else {
752 None
753 };
754
755 let watermark_cache = if USE_WATERMARK_CACHE {
756 StateTableWatermarkCache::new(WATERMARK_CACHE_ENTRIES)
757 } else {
758 StateTableWatermarkCache::new(0)
759 };
760
761 let output_column_ids_to_input_idx = output_column_ids
763 .iter()
764 .enumerate()
765 .map(|(pos, id)| (*id, pos))
766 .collect::<HashMap<_, _>>();
767
768 let columns: Vec<ColumnDesc> = table_catalog
770 .columns
771 .iter()
772 .map(|c| c.column_desc.as_ref().unwrap().into())
773 .collect_vec();
774
775 let mut i2o_mapping = vec![None; columns.len()];
779 for (i, column) in columns.iter().enumerate() {
780 if let Some(pos) = output_column_ids_to_input_idx.get(&column.column_id) {
781 i2o_mapping[i] = Some(*pos);
782 }
783 }
784 let i2o_mapping = ColIndexMapping::new(i2o_mapping, output_column_ids.len());
786
787 let (_, output_indices) = find_columns_by_ids(&columns[..], &output_column_ids);
789
790 Self {
791 table_id,
792 row_store: StateTableRowStore {
793 all_rows: preload_all_rows.then(HashMap::new),
794 table_option,
795 state_store: local_state_store,
796 row_serde,
797 pk_serde: pk_serde.clone(),
798 table_id,
799 },
800 store,
801 epoch: None,
802 pk_serde,
803 pk_indices,
804 distribution,
805 prefix_hint_len,
806 value_indices,
807 pending_watermark: None,
808 committed_watermark,
809 watermark_cache,
810 data_types,
811 output_indices,
812 i2o_mapping,
813 op_consistency_level: state_table_op_consistency_level,
814 clean_watermark_index_in_pk: table_catalog.clean_watermark_index_in_pk,
815 on_post_commit: false,
816 }
817 }
818
819 pub fn get_data_types(&self) -> &[DataType] {
820 &self.data_types
821 }
822
823 pub fn table_id(&self) -> u32 {
824 self.table_id.table_id
825 }
826
827 fn compute_prefix_vnode(&self, pk_prefix: &impl Row) -> VirtualNode {
829 self.distribution
830 .try_compute_vnode_by_pk_prefix(pk_prefix)
831 .expect("For streaming, the given prefix must be enough to calculate the vnode")
832 }
833
834 pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode {
836 self.distribution.compute_vnode_by_pk(pk)
837 }
838
839 pub fn pk_indices(&self) -> &[usize] {
842 &self.pk_indices
843 }
844
845 pub fn pk_in_output_indices(&self) -> Option<Vec<usize>> {
849 assert!(IS_REPLICATED);
850 self.pk_indices
851 .iter()
852 .map(|&i| self.output_indices.iter().position(|&j| i == j))
853 .collect()
854 }
855
856 pub fn pk_serde(&self) -> &OrderedRowSerde {
857 &self.pk_serde
858 }
859
860 pub fn vnodes(&self) -> &Arc<Bitmap> {
861 self.distribution.vnodes()
862 }
863
864 pub fn value_indices(&self) -> &Option<Vec<usize>> {
865 &self.value_indices
866 }
867
868 pub fn is_consistent_op(&self) -> bool {
869 matches!(
870 self.op_consistency_level,
871 StateTableOpConsistencyLevel::ConsistentOldValue
872 | StateTableOpConsistencyLevel::LogStoreEnabled
873 )
874 }
875}
876
877impl<S, SD, const USE_WATERMARK_CACHE: bool> StateTableInner<S, SD, true, USE_WATERMARK_CACHE>
878where
879 S: StateStore,
880 SD: ValueRowSerde,
881{
882 pub async fn new_replicated(
884 table_catalog: &Table,
885 store: S,
886 vnodes: Option<Arc<Bitmap>>,
887 output_column_ids: Vec<ColumnId>,
888 ) -> Self {
889 StateTableBuilder::new(table_catalog, store, vnodes)
892 .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
893 .with_output_column_ids(output_column_ids)
894 .forbid_preload_all_rows()
895 .build()
896 .await
897 }
898}
899
900impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
902 StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
903where
904 S: StateStore,
905 SD: ValueRowSerde,
906{
907 pub async fn get_row(&self, pk: impl Row) -> StreamExecutorResult<Option<OwnedRow>> {
909 let (serialized_pk, prefix_hint) = self.serialize_pk_and_get_prefix_hint(&pk);
910 let row = self.row_store.get(serialized_pk, prefix_hint).await?;
911 match row {
912 Some(row) => {
913 if IS_REPLICATED {
914 let row = row.project(&self.output_indices);
917 Ok(Some(row.into_owned_row()))
918 } else {
919 Ok(Some(row))
920 }
921 }
922 None => Ok(None),
923 }
924 }
925
926 pub async fn exists(&self, pk: impl Row) -> StreamExecutorResult<bool> {
928 let (serialized_pk, prefix_hint) = self.serialize_pk_and_get_prefix_hint(&pk);
929 self.row_store.exists(serialized_pk, prefix_hint).await
930 }
931
932 fn serialize_pk(&self, pk: &impl Row) -> TableKey<Bytes> {
933 assert!(pk.len() <= self.pk_indices.len());
934 serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_vnode_by_pk(pk))
935 }
936
937 fn serialize_pk_and_get_prefix_hint(&self, pk: &impl Row) -> (TableKey<Bytes>, Option<Bytes>) {
938 let serialized_pk = self.serialize_pk(&pk);
939 let prefix_hint = if self.prefix_hint_len != 0 && self.prefix_hint_len == pk.len() {
940 Some(serialized_pk.slice(VirtualNode::SIZE..))
941 } else {
942 #[cfg(debug_assertions)]
943 if self.prefix_hint_len != 0 {
944 warn!(
945 "prefix_hint_len is not equal to pk.len(), may not be able to utilize bloom filter"
946 );
947 }
948 None
949 };
950 (serialized_pk, prefix_hint)
951 }
952}
953
954impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
955 async fn get(
956 &self,
957 key_bytes: TableKey<Bytes>,
958 prefix_hint: Option<Bytes>,
959 ) -> StreamExecutorResult<Option<OwnedRow>> {
960 if let Some(rows) = &self.all_rows {
961 let (vnode, key) = key_bytes.split_vnode();
962 return Ok(rows.get(&vnode).expect("covered vnode").get(key).cloned());
963 }
964 let read_options = ReadOptions {
965 prefix_hint,
966 retention_seconds: self.table_option.retention_seconds,
967 cache_policy: CachePolicy::Fill(Hint::Normal),
968 ..Default::default()
969 };
970
971 let row_serde = self.row_serde.clone();
973
974 self.state_store
975 .on_key_value(key_bytes, read_options, move |_, value| {
976 let row = row_serde.deserialize(value)?;
977 Ok(OwnedRow::new(row))
978 })
979 .await
980 .map_err(Into::into)
981 }
982
983 async fn exists(
984 &self,
985 key_bytes: TableKey<Bytes>,
986 prefix_hint: Option<Bytes>,
987 ) -> StreamExecutorResult<bool> {
988 if let Some(rows) = &self.all_rows {
989 let (vnode, key) = key_bytes.split_vnode();
990 return Ok(rows.get(&vnode).expect("covered vnode").contains_key(key));
991 }
992 let read_options = ReadOptions {
993 prefix_hint,
994 retention_seconds: self.table_option.retention_seconds,
995 cache_policy: CachePolicy::Fill(Hint::Normal),
996 ..Default::default()
997 };
998 let result = self
999 .state_store
1000 .on_key_value(key_bytes, read_options, move |_, _| Ok(()))
1001 .await?;
1002 Ok(result.is_some())
1003 }
1004}
1005
1006#[must_use]
1021pub struct StateTablePostCommit<
1022 'a,
1023 S,
1024 SD = BasicSerde,
1025 const IS_REPLICATED: bool = false,
1026 const USE_WATERMARK_CACHE: bool = false,
1027> where
1028 S: StateStore,
1029 SD: ValueRowSerde,
1030{
1031 inner: &'a mut StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>,
1032}
1033
1034impl<'a, S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1035 StateTablePostCommit<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1036where
1037 S: StateStore,
1038 SD: ValueRowSerde,
1039{
1040 pub async fn post_yield_barrier(
1041 mut self,
1042 new_vnodes: Option<Arc<Bitmap>>,
1043 ) -> StreamExecutorResult<
1044 Option<(
1045 (
1046 Arc<Bitmap>,
1047 Arc<Bitmap>,
1048 &'a mut StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>,
1049 ),
1050 bool,
1051 )>,
1052 > {
1053 self.inner.on_post_commit = false;
1054 Ok(if let Some(new_vnodes) = new_vnodes {
1055 let (old_vnodes, cache_may_stale) =
1056 self.update_vnode_bitmap(new_vnodes.clone()).await?;
1057 Some(((new_vnodes, old_vnodes, self.inner), cache_may_stale))
1058 } else {
1059 None
1060 })
1061 }
1062
1063 pub fn inner(&self) -> &StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE> {
1064 &*self.inner
1065 }
1066
1067 async fn update_vnode_bitmap(
1069 &mut self,
1070 new_vnodes: Arc<Bitmap>,
1071 ) -> StreamExecutorResult<(Arc<Bitmap>, bool)> {
1072 let prev_vnodes = self
1073 .inner
1074 .row_store
1075 .update_vnode_bitmap(new_vnodes.clone())
1076 .await?;
1077 assert_eq!(
1078 &prev_vnodes,
1079 self.inner.vnodes(),
1080 "state table and state store vnode bitmap mismatches"
1081 );
1082
1083 if self.inner.distribution.is_singleton() {
1084 assert_eq!(
1085 &new_vnodes,
1086 self.inner.vnodes(),
1087 "should not update vnode bitmap for singleton table"
1088 );
1089 }
1090 assert_eq!(self.inner.vnodes().len(), new_vnodes.len());
1091
1092 let cache_may_stale = cache_may_stale(self.inner.vnodes(), &new_vnodes);
1093
1094 if cache_may_stale {
1095 self.inner.pending_watermark = None;
1096 if USE_WATERMARK_CACHE {
1097 self.inner.watermark_cache.clear();
1098 }
1099 }
1100
1101 Ok((
1102 self.inner.distribution.update_vnode_bitmap(new_vnodes),
1103 cache_may_stale,
1104 ))
1105 }
1106}
1107
1108impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1110 fn handle_mem_table_error(&self, e: StorageError) {
1111 let e = match e.into_inner() {
1112 ErrorKind::MemTable(e) => e,
1113 _ => unreachable!("should only get memtable error"),
1114 };
1115 match *e {
1116 MemTableError::InconsistentOperation { key, prev, new, .. } => {
1117 let (vnode, key) = deserialize_pk_with_vnode(&key, &self.pk_serde).unwrap();
1118 panic!(
1119 "mem-table operation inconsistent! table_id: {}, vnode: {}, key: {:?}, prev: {}, new: {}",
1120 self.table_id,
1121 vnode,
1122 &key,
1123 prev.debug_fmt(&*self.row_serde),
1124 new.debug_fmt(&*self.row_serde),
1125 )
1126 }
1127 }
1128 }
1129
1130 fn insert(&mut self, key: TableKey<Bytes>, value: impl Row) {
1131 insane_mode_discard_point!();
1132 let value_bytes = self.row_serde.serialize(&value).into();
1133 if let Some(rows) = &mut self.all_rows {
1134 let (vnode, key) = key.split_vnode_bytes();
1135 rows.get_mut(&vnode)
1136 .expect("covered vnode")
1137 .insert(key, value.into_owned_row());
1138 }
1139 self.state_store
1140 .insert(key, value_bytes, None)
1141 .unwrap_or_else(|e| self.handle_mem_table_error(e));
1142 }
1143
1144 fn delete(&mut self, key: TableKey<Bytes>, value: impl Row) {
1145 insane_mode_discard_point!();
1146 let value_bytes = self.row_serde.serialize(value).into();
1147 if let Some(rows) = &mut self.all_rows {
1148 let (vnode, key) = key.split_vnode();
1149 rows.get_mut(&vnode).expect("covered vnode").remove(key);
1150 }
1151 self.state_store
1152 .delete(key, value_bytes)
1153 .unwrap_or_else(|e| self.handle_mem_table_error(e));
1154 }
1155
1156 fn update(&mut self, key_bytes: TableKey<Bytes>, old_value: impl Row, new_value: impl Row) {
1157 insane_mode_discard_point!();
1158 let new_value_bytes = self.row_serde.serialize(&new_value).into();
1159 let old_value_bytes = self.row_serde.serialize(old_value).into();
1160 if let Some(rows) = &mut self.all_rows {
1161 let (vnode, key) = key_bytes.split_vnode_bytes();
1162 rows.get_mut(&vnode)
1163 .expect("covered vnode")
1164 .insert(key, new_value.into_owned_row());
1165 }
1166 self.state_store
1167 .insert(key_bytes, new_value_bytes, Some(old_value_bytes))
1168 .unwrap_or_else(|e| self.handle_mem_table_error(e));
1169 }
1170}
1171
1172impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1173 StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1174where
1175 S: StateStore,
1176 SD: ValueRowSerde,
1177{
1178 pub fn insert(&mut self, value: impl Row) {
1181 let pk_indices = &self.pk_indices;
1182 let pk = (&value).project(pk_indices);
1183 if USE_WATERMARK_CACHE {
1184 self.watermark_cache.insert(&pk);
1185 }
1186
1187 let key_bytes = self.serialize_pk(&pk);
1188 dispatch_value_indices!(&self.value_indices, [value], {
1189 self.row_store.insert(key_bytes, value)
1190 })
1191 }
1192
1193 pub fn delete(&mut self, old_value: impl Row) {
1196 let pk_indices = &self.pk_indices;
1197 let pk = (&old_value).project(pk_indices);
1198 if USE_WATERMARK_CACHE {
1199 self.watermark_cache.delete(&pk);
1200 }
1201
1202 let key_bytes = self.serialize_pk(&pk);
1203 dispatch_value_indices!(&self.value_indices, [old_value], {
1204 self.row_store.delete(key_bytes, old_value)
1205 })
1206 }
1207
1208 pub fn update(&mut self, old_value: impl Row, new_value: impl Row) {
1210 let old_pk = (&old_value).project(self.pk_indices());
1211 let new_pk = (&new_value).project(self.pk_indices());
1212 debug_assert!(
1213 Row::eq(&old_pk, new_pk),
1214 "pk should not change: {old_pk:?} vs {new_pk:?}. {}",
1215 self.table_id
1216 );
1217
1218 let key_bytes = self.serialize_pk(&new_pk);
1219 dispatch_value_indices!(&self.value_indices, [old_value, new_value], {
1220 self.row_store.update(key_bytes, old_value, new_value)
1221 })
1222 }
1223
1224 pub fn write_record(&mut self, record: Record<impl Row>) {
1226 match record {
1227 Record::Insert { new_row } => self.insert(new_row),
1228 Record::Delete { old_row } => self.delete(old_row),
1229 Record::Update { old_row, new_row } => self.update(old_row, new_row),
1230 }
1231 }
1232
1233 fn fill_non_output_indices(&self, chunk: StreamChunk) -> StreamChunk {
1234 fill_non_output_indices(&self.i2o_mapping, &self.data_types, chunk)
1235 }
1236
1237 #[allow(clippy::disallowed_methods)]
1240 pub fn write_chunk(&mut self, chunk: StreamChunk) {
1241 let chunk = if IS_REPLICATED {
1242 self.fill_non_output_indices(chunk)
1243 } else {
1244 chunk
1245 };
1246
1247 let vnodes = self
1248 .distribution
1249 .compute_chunk_vnode(&chunk, &self.pk_indices);
1250
1251 for (idx, optional_row) in chunk.rows_with_holes().enumerate() {
1252 let Some((op, row)) = optional_row else {
1253 continue;
1254 };
1255 let pk = row.project(&self.pk_indices);
1256 let vnode = vnodes[idx];
1257 let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, vnode);
1258 match op {
1259 Op::Insert | Op::UpdateInsert => {
1260 if USE_WATERMARK_CACHE {
1261 self.watermark_cache.insert(&pk);
1262 }
1263 dispatch_value_indices!(&self.value_indices, [row], {
1264 self.row_store.insert(key_bytes, row);
1265 });
1266 }
1267 Op::Delete | Op::UpdateDelete => {
1268 if USE_WATERMARK_CACHE {
1269 self.watermark_cache.delete(&pk);
1270 }
1271 dispatch_value_indices!(&self.value_indices, [row], {
1272 self.row_store.delete(key_bytes, row);
1273 });
1274 }
1275 }
1276 }
1277 }
1278
1279 pub fn update_watermark(&mut self, watermark: ScalarImpl) {
1285 trace!(table_id = %self.table_id, watermark = ?watermark, "update watermark");
1286 self.pending_watermark = Some(watermark);
1287 }
1288
1289 pub fn get_committed_watermark(&self) -> Option<&ScalarImpl> {
1292 self.committed_watermark.as_ref()
1293 }
1294
1295 pub async fn commit(
1296 &mut self,
1297 new_epoch: EpochPair,
1298 ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1299 {
1300 self.commit_inner(new_epoch, None).await
1301 }
1302
1303 #[cfg(test)]
1304 pub async fn commit_for_test(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
1305 self.commit_assert_no_update_vnode_bitmap(new_epoch).await
1306 }
1307
1308 pub async fn commit_assert_no_update_vnode_bitmap(
1309 &mut self,
1310 new_epoch: EpochPair,
1311 ) -> StreamExecutorResult<()> {
1312 let post_commit = self.commit_inner(new_epoch, None).await?;
1313 post_commit.post_yield_barrier(None).await?;
1314 Ok(())
1315 }
1316
1317 pub async fn commit_may_switch_consistent_op(
1318 &mut self,
1319 new_epoch: EpochPair,
1320 op_consistency_level: StateTableOpConsistencyLevel,
1321 ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1322 {
1323 if self.op_consistency_level != op_consistency_level {
1324 if !cfg!(debug_assertions) {
1326 info!(
1327 ?new_epoch,
1328 prev_op_consistency_level = ?self.op_consistency_level,
1329 ?op_consistency_level,
1330 table_id = self.table_id.table_id,
1331 "switch to new op consistency level"
1332 );
1333 }
1334 self.commit_inner(new_epoch, Some(op_consistency_level))
1335 .await
1336 } else {
1337 self.commit_inner(new_epoch, None).await
1338 }
1339 }
1340
1341 async fn commit_inner(
1342 &mut self,
1343 new_epoch: EpochPair,
1344 switch_consistent_op: Option<StateTableOpConsistencyLevel>,
1345 ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1346 {
1347 assert!(!self.on_post_commit);
1348 assert_eq!(
1349 self.epoch.expect("should only be called after init").curr,
1350 new_epoch.prev
1351 );
1352 if let Some(new_consistency_level) = switch_consistent_op {
1353 assert_ne!(self.op_consistency_level, new_consistency_level);
1354 self.op_consistency_level = new_consistency_level;
1355 }
1356 trace!(
1357 table_id = %self.table_id,
1358 epoch = ?self.epoch,
1359 "commit state table"
1360 );
1361
1362 let table_watermarks = self.commit_pending_watermark();
1363 self.row_store
1364 .seal_current_epoch(new_epoch.curr, table_watermarks, switch_consistent_op)
1365 .await?;
1366 self.epoch = Some(new_epoch);
1367
1368 if USE_WATERMARK_CACHE
1370 && !self.watermark_cache.is_synced()
1371 && let Some(ref watermark) = self.committed_watermark
1372 {
1373 let range: (Bound<Once<Datum>>, Bound<Once<Datum>>) =
1374 (Included(once(Some(watermark.clone()))), Unbounded);
1375 let mut pks = Vec::with_capacity(self.watermark_cache.capacity());
1384 {
1385 let mut streams = vec![];
1386 for vnode in self.vnodes().iter_vnodes() {
1387 let stream = self
1388 .iter_keyed_row_with_vnode(vnode, &range, PrefetchOptions::default())
1389 .await?;
1390 streams.push(Box::pin(stream));
1391 }
1392 let merged_stream = merge_sort(streams);
1393 pin_mut!(merged_stream);
1394
1395 #[for_await]
1396 for entry in merged_stream.take(self.watermark_cache.capacity()) {
1397 let keyed_row = entry?;
1398 let pk = self.pk_serde.deserialize(keyed_row.key())?;
1399 if !pk.is_null_at(self.clean_watermark_index_in_pk.unwrap_or(0) as usize) {
1401 pks.push(pk);
1402 }
1403 }
1404 }
1405
1406 let mut filler = self.watermark_cache.begin_syncing();
1407 for pk in pks {
1408 filler.insert_unchecked(DefaultOrdered(pk), ());
1409 }
1410 filler.finish();
1411
1412 let n_cache_entries = self.watermark_cache.len();
1413 if n_cache_entries < self.watermark_cache.capacity() {
1414 self.watermark_cache.set_table_row_count(n_cache_entries);
1415 }
1416 }
1417
1418 self.on_post_commit = true;
1419 Ok(StateTablePostCommit { inner: self })
1420 }
1421
1422 fn commit_pending_watermark(
1424 &mut self,
1425 ) -> Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)> {
1426 let watermark = self.pending_watermark.take()?;
1427 trace!(table_id = %self.table_id, watermark = ?watermark, "state cleaning");
1428
1429 assert!(
1430 !self.pk_indices().is_empty(),
1431 "see pending watermark on empty pk"
1432 );
1433 let watermark_serializer = {
1434 match self.clean_watermark_index_in_pk {
1435 None => self.pk_serde.index(0),
1436 Some(clean_watermark_index_in_pk) => {
1437 self.pk_serde.index(clean_watermark_index_in_pk as usize)
1438 }
1439 }
1440 };
1441
1442 let watermark_type = match self.clean_watermark_index_in_pk {
1443 None => WatermarkSerdeType::PkPrefix,
1444 Some(clean_watermark_index_in_pk) => match clean_watermark_index_in_pk {
1445 0 => WatermarkSerdeType::PkPrefix,
1446 _ => WatermarkSerdeType::NonPkPrefix,
1447 },
1448 };
1449
1450 let should_clean_watermark = {
1451 {
1452 if USE_WATERMARK_CACHE && self.watermark_cache.is_synced() {
1453 if let Some(key) = self.watermark_cache.lowest_key() {
1454 watermark.as_scalar_ref_impl().default_cmp(&key).is_ge()
1455 } else {
1456 false
1461 }
1462 } else {
1463 true
1467 }
1468 }
1469 };
1470
1471 let watermark_suffix =
1472 serialize_pk(row::once(Some(watermark.clone())), &watermark_serializer);
1473
1474 let seal_watermark = if should_clean_watermark {
1476 trace!(table_id = %self.table_id, watermark = ?watermark_suffix, vnodes = ?{
1477 self.vnodes().iter_vnodes().collect_vec()
1478 }, "delete range");
1479
1480 let order_type = watermark_serializer.get_order_types().get(0).unwrap();
1481
1482 if order_type.is_ascending() {
1483 Some((
1484 WatermarkDirection::Ascending,
1485 VnodeWatermark::new(
1486 self.vnodes().clone(),
1487 Bytes::copy_from_slice(watermark_suffix.as_ref()),
1488 ),
1489 watermark_type,
1490 ))
1491 } else {
1492 Some((
1493 WatermarkDirection::Descending,
1494 VnodeWatermark::new(
1495 self.vnodes().clone(),
1496 Bytes::copy_from_slice(watermark_suffix.as_ref()),
1497 ),
1498 watermark_type,
1499 ))
1500 }
1501 } else {
1502 None
1503 };
1504 self.committed_watermark = Some(watermark);
1505
1506 if USE_WATERMARK_CACHE && seal_watermark.is_some() {
1514 self.watermark_cache.clear();
1515 }
1516
1517 seal_watermark.map(|(direction, watermark, is_non_pk_prefix)| {
1518 (direction, vec![watermark], is_non_pk_prefix)
1519 })
1520 }
1521
1522 pub async fn try_flush(&mut self) -> StreamExecutorResult<()> {
1523 self.row_store.try_flush().await?;
1524 Ok(())
1525 }
1526}
1527
1528pub trait RowStream<'a> = Stream<Item = StreamExecutorResult<OwnedRow>> + 'a;
1529pub trait KeyedRowStream<'a> = Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a;
1530pub trait PkRowStream<'a, K> = Stream<Item = StreamExecutorResult<(K, OwnedRow)>> + 'a;
1531
1532pub trait FromVnodeBytes {
1533 fn from_vnode_bytes(vnode: VirtualNode, bytes: &Bytes) -> Self;
1534}
1535
1536impl FromVnodeBytes for Bytes {
1537 fn from_vnode_bytes(vnode: VirtualNode, bytes: &Bytes) -> Self {
1538 prefix_slice_with_vnode(vnode, bytes)
1539 }
1540}
1541
1542impl FromVnodeBytes for () {
1543 fn from_vnode_bytes(_vnode: VirtualNode, _bytes: &Bytes) -> Self {}
1544}
1545
1546impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1548 StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1549where
1550 S: StateStore,
1551 SD: ValueRowSerde,
1552{
1553 pub async fn iter_with_vnode(
1556 &self,
1557
1558 vnode: VirtualNode,
1562 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1563 prefetch_options: PrefetchOptions,
1564 ) -> StreamExecutorResult<impl RowStream<'_>> {
1565 Ok(self
1566 .iter_kv_with_pk_range::<()>(pk_range, vnode, prefetch_options)
1567 .await?
1568 .map_ok(|(_, row)| row))
1569 }
1570
1571 pub async fn iter_keyed_row_with_vnode(
1572 &self,
1573 vnode: VirtualNode,
1574 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1575 prefetch_options: PrefetchOptions,
1576 ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1577 Ok(self
1578 .iter_kv_with_pk_range(pk_range, vnode, prefetch_options)
1579 .await?
1580 .map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)))
1581 }
1582
1583 pub async fn iter_with_vnode_and_output_indices(
1584 &self,
1585 vnode: VirtualNode,
1586 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1587 prefetch_options: PrefetchOptions,
1588 ) -> StreamExecutorResult<impl RowStream<'_>> {
1589 assert!(IS_REPLICATED);
1590 let stream = self
1591 .iter_with_vnode(vnode, pk_range, prefetch_options)
1592 .await?;
1593 Ok(stream.map(|row| row.map(|row| row.project(&self.output_indices).into_owned_row())))
1594 }
1595}
1596
1597impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1598 async fn iter_kv<K: CopyFromSlice + FromVnodeBytes>(
1599 &self,
1600 vnode: VirtualNode,
1601 (start, end): (Bound<Bytes>, Bound<Bytes>),
1602 prefix_hint: Option<Bytes>,
1603 prefetch_options: PrefetchOptions,
1604 ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1605 if let Some(rows) = &self.all_rows {
1606 return Ok(futures::future::Either::Left(futures::stream::iter(
1607 rows.get(&vnode)
1608 .expect("covered vnode")
1609 .range((start, end))
1610 .map(move |(key, value)| Ok((K::from_vnode_bytes(vnode, key), value.clone()))),
1611 )));
1612 }
1613 let read_options = ReadOptions {
1614 prefix_hint,
1615 retention_seconds: self.table_option.retention_seconds,
1616 prefetch_options,
1617 cache_policy: CachePolicy::Fill(Hint::Normal),
1618 };
1619
1620 Ok(futures::future::Either::Right(
1621 deserialize_keyed_row_stream(
1622 self.state_store
1623 .iter(prefixed_range_with_vnode((start, end), vnode), read_options)
1624 .await?,
1625 &*self.row_serde,
1626 ),
1627 ))
1628 }
1629
1630 async fn rev_iter_kv<K: CopyFromSlice + FromVnodeBytes>(
1631 &self,
1632 vnode: VirtualNode,
1633 (start, end): (Bound<Bytes>, Bound<Bytes>),
1634 prefix_hint: Option<Bytes>,
1635 prefetch_options: PrefetchOptions,
1636 ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1637 if let Some(rows) = &self.all_rows {
1638 return Ok(futures::future::Either::Left(futures::stream::iter(
1639 rows.get(&vnode)
1640 .expect("covered vnode")
1641 .range((start, end))
1642 .rev()
1643 .map(move |(key, value)| Ok((K::from_vnode_bytes(vnode, key), value.clone()))),
1644 )));
1645 }
1646 let read_options = ReadOptions {
1647 prefix_hint,
1648 retention_seconds: self.table_option.retention_seconds,
1649 prefetch_options,
1650 cache_policy: CachePolicy::Fill(Hint::Normal),
1651 };
1652
1653 Ok(futures::future::Either::Right(
1654 deserialize_keyed_row_stream(
1655 self.state_store
1656 .rev_iter(prefixed_range_with_vnode((start, end), vnode), read_options)
1657 .await?,
1658 &*self.row_serde,
1659 ),
1660 ))
1661 }
1662}
1663
1664impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1665 StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1666where
1667 S: StateStore,
1668 SD: ValueRowSerde,
1669{
1670 pub async fn iter_with_prefix(
1674 &self,
1675 pk_prefix: impl Row,
1676 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1677 prefetch_options: PrefetchOptions,
1678 ) -> StreamExecutorResult<impl RowStream<'_>> {
1679 let stream = self.iter_with_prefix_inner::<false, ()>(pk_prefix, sub_range, prefetch_options)
1680 .await?;
1681 Ok(stream.map_ok(|(_, row)| row))
1682 }
1683
1684 pub async fn get_from_one_row_table(&self) -> StreamExecutorResult<Option<OwnedRow>> {
1686 let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
1687 let stream = self
1688 .iter_with_prefix(row::empty(), sub_range, Default::default())
1689 .await?;
1690 pin_mut!(stream);
1691
1692 if let Some(res) = stream.next().await {
1693 let value = res?.into_owned_row();
1694 assert!(stream.next().await.is_none());
1695 Ok(Some(value))
1696 } else {
1697 Ok(None)
1698 }
1699 }
1700
1701 pub async fn get_from_one_value_table(&self) -> StreamExecutorResult<Option<ScalarImpl>> {
1706 Ok(self
1707 .get_from_one_row_table()
1708 .await?
1709 .and_then(|row| row[0].clone()))
1710 }
1711
1712 pub async fn iter_keyed_row_with_prefix(
1713 &self,
1714 pk_prefix: impl Row,
1715 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1716 prefetch_options: PrefetchOptions,
1717 ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1718 Ok(
1719 self.iter_with_prefix_inner::<false, Bytes>(pk_prefix, sub_range, prefetch_options)
1720 .await?.map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)),
1721 )
1722 }
1723
1724 pub async fn rev_iter_with_prefix(
1726 &self,
1727 pk_prefix: impl Row,
1728 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1729 prefetch_options: PrefetchOptions,
1730 ) -> StreamExecutorResult<impl RowStream<'_>> {
1731 Ok(
1732 self.iter_with_prefix_inner::<true, ()>(pk_prefix, sub_range, prefetch_options)
1733 .await?.map_ok(|(_, row)| row),
1734 )
1735 }
1736
1737 async fn iter_with_prefix_inner<const REVERSE: bool, K: CopyFromSlice + FromVnodeBytes>(
1738 &self,
1739 pk_prefix: impl Row,
1740 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1741 prefetch_options: PrefetchOptions,
1742 ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1743 let prefix_serializer = self.pk_serde.prefix(pk_prefix.len());
1744 let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
1745
1746 let vnode = self.compute_prefix_vnode(&pk_prefix);
1750
1751 let pk_prefix_indices = &self.pk_indices[..pk_prefix.len()];
1753 if self.prefix_hint_len != 0 {
1754 debug_assert_eq!(self.prefix_hint_len, pk_prefix.len());
1755 }
1756 let prefix_hint = {
1757 if self.prefix_hint_len == 0 || self.prefix_hint_len > pk_prefix.len() {
1758 None
1759 } else {
1760 let encoded_prefix_len = self
1761 .pk_serde
1762 .deserialize_prefix_len(&encoded_prefix, self.prefix_hint_len)?;
1763
1764 Some(Bytes::copy_from_slice(
1765 &encoded_prefix[..encoded_prefix_len],
1766 ))
1767 }
1768 };
1769
1770 trace!(
1771 table_id = %self.table_id(),
1772 ?prefix_hint, ?pk_prefix,
1773 ?pk_prefix_indices,
1774 iter_direction = if REVERSE { "reverse" } else { "forward" },
1775 "storage_iter_with_prefix"
1776 );
1777
1778 let memcomparable_range =
1779 prefix_and_sub_range_to_memcomparable(&self.pk_serde, sub_range, pk_prefix);
1780
1781 Ok(if REVERSE {
1782 futures::future::Either::Left(
1783 self.row_store
1784 .rev_iter_kv(vnode, memcomparable_range, prefix_hint, prefetch_options)
1785 .await?,
1786 )
1787 } else {
1788 futures::future::Either::Right(
1789 self.row_store
1790 .iter_kv(vnode, memcomparable_range, prefix_hint, prefetch_options)
1791 .await?,
1792 )
1793 })
1794 }
1795
1796 async fn iter_kv_with_pk_range<'a, K: CopyFromSlice + FromVnodeBytes>(
1799 &'a self,
1800 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1801 vnode: VirtualNode,
1805 prefetch_options: PrefetchOptions,
1806 ) -> StreamExecutorResult<impl PkRowStream<'a, K>> {
1807 let memcomparable_range = prefix_range_to_memcomparable(&self.pk_serde, pk_range);
1808
1809 self.row_store
1811 .iter_kv(vnode, memcomparable_range, None, prefetch_options)
1812 .await
1813 }
1814
1815 #[cfg(test)]
1816 pub fn get_watermark_cache(&self) -> &StateTableWatermarkCache {
1817 &self.watermark_cache
1818 }
1819}
1820
1821fn deserialize_keyed_row_stream<'a, K: CopyFromSlice>(
1822 iter: impl StateStoreIter + 'a,
1823 deserializer: &'a impl ValueRowSerde,
1824) -> impl PkRowStream<'a, K> {
1825 iter.into_stream(move |(key, value)| {
1826 Ok((
1827 K::copy_from_slice(key.user_key.table_key.as_ref()),
1828 deserializer.deserialize(value).map(OwnedRow::new)?,
1829 ))
1830 })
1831 .map_err(Into::into)
1832}
1833
1834pub fn prefix_range_to_memcomparable(
1835 pk_serde: &OrderedRowSerde,
1836 range: &(Bound<impl Row>, Bound<impl Row>),
1837) -> (Bound<Bytes>, Bound<Bytes>) {
1838 (
1839 start_range_to_memcomparable(pk_serde, &range.0),
1840 end_range_to_memcomparable(pk_serde, &range.1, None),
1841 )
1842}
1843
1844fn prefix_and_sub_range_to_memcomparable(
1845 pk_serde: &OrderedRowSerde,
1846 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1847 pk_prefix: impl Row,
1848) -> (Bound<Bytes>, Bound<Bytes>) {
1849 let (range_start, range_end) = sub_range;
1850 let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1851 let serialized_pk_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
1852 let start_range = match range_start {
1853 Included(start_range) => Bound::Included(Either::Left((&pk_prefix).chain(start_range))),
1854 Excluded(start_range) => Bound::Excluded(Either::Left((&pk_prefix).chain(start_range))),
1855 Unbounded => Bound::Included(Either::Right(&pk_prefix)),
1856 };
1857 let end_range = match range_end {
1858 Included(end_range) => Bound::Included((&pk_prefix).chain(end_range)),
1859 Excluded(end_range) => Bound::Excluded((&pk_prefix).chain(end_range)),
1860 Unbounded => Unbounded,
1861 };
1862 (
1863 start_range_to_memcomparable(pk_serde, &start_range),
1864 end_range_to_memcomparable(pk_serde, &end_range, Some(serialized_pk_prefix)),
1865 )
1866}
1867
1868fn start_range_to_memcomparable<R: Row>(
1869 pk_serde: &OrderedRowSerde,
1870 bound: &Bound<R>,
1871) -> Bound<Bytes> {
1872 let serialize_pk_prefix = |pk_prefix: &R| {
1873 let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1874 serialize_pk(pk_prefix, &prefix_serializer)
1875 };
1876 match bound {
1877 Unbounded => Unbounded,
1878 Included(r) => {
1879 let serialized = serialize_pk_prefix(r);
1880
1881 Included(serialized)
1882 }
1883 Excluded(r) => {
1884 let serialized = serialize_pk_prefix(r);
1885
1886 start_bound_of_excluded_prefix(&serialized)
1887 }
1888 }
1889}
1890
1891fn end_range_to_memcomparable<R: Row>(
1892 pk_serde: &OrderedRowSerde,
1893 bound: &Bound<R>,
1894 serialized_pk_prefix: Option<Bytes>,
1895) -> Bound<Bytes> {
1896 let serialize_pk_prefix = |pk_prefix: &R| {
1897 let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1898 serialize_pk(pk_prefix, &prefix_serializer)
1899 };
1900 match bound {
1901 Unbounded => match serialized_pk_prefix {
1902 Some(serialized_pk_prefix) => end_bound_of_prefix(&serialized_pk_prefix),
1903 None => Unbounded,
1904 },
1905 Included(r) => {
1906 let serialized = serialize_pk_prefix(r);
1907
1908 end_bound_of_prefix(&serialized)
1909 }
1910 Excluded(r) => {
1911 let serialized = serialize_pk_prefix(r);
1912 Excluded(serialized)
1913 }
1914 }
1915}
1916
1917fn fill_non_output_indices(
1918 i2o_mapping: &ColIndexMapping,
1919 data_types: &[DataType],
1920 chunk: StreamChunk,
1921) -> StreamChunk {
1922 let cardinality = chunk.cardinality();
1923 let (ops, columns, vis) = chunk.into_inner();
1924 let mut full_columns = Vec::with_capacity(data_types.len());
1925 for (i, data_type) in data_types.iter().enumerate() {
1926 if let Some(j) = i2o_mapping.try_map(i) {
1927 full_columns.push(columns[j].clone());
1928 } else {
1929 let mut column_builder = ArrayImplBuilder::with_type(cardinality, data_type.clone());
1930 column_builder.append_n_null(cardinality);
1931 let column: ArrayRef = column_builder.finish().into();
1932 full_columns.push(column)
1933 }
1934 }
1935 let data_chunk = DataChunk::new(full_columns, vis);
1936 StreamChunk::from_parts(ops, data_chunk)
1937}
1938
1939#[cfg(test)]
1940mod tests {
1941 use std::fmt::Debug;
1942
1943 use expect_test::{Expect, expect};
1944
1945 use super::*;
1946
1947 fn check(actual: impl Debug, expect: Expect) {
1948 let actual = format!("{:#?}", actual);
1949 expect.assert_eq(&actual);
1950 }
1951
1952 #[test]
1953 fn test_fill_non_output_indices() {
1954 let data_types = vec![DataType::Int32, DataType::Int32, DataType::Int32];
1955 let replicated_chunk = [OwnedRow::new(vec![
1956 Some(222_i32.into()),
1957 Some(2_i32.into()),
1958 ])];
1959 let replicated_chunk = StreamChunk::from_parts(
1960 vec![Op::Insert],
1961 DataChunk::from_rows(&replicated_chunk, &[DataType::Int32, DataType::Int32]),
1962 );
1963 let i2o_mapping = ColIndexMapping::new(vec![Some(1), None, Some(0)], 2);
1964 let filled_chunk = fill_non_output_indices(&i2o_mapping, &data_types, replicated_chunk);
1965 check(
1966 filled_chunk,
1967 expect![[r#"
1968 StreamChunk { cardinality: 1, capacity: 1, data:
1969 +---+---+---+-----+
1970 | + | 2 | | 222 |
1971 +---+---+---+-----+
1972 }"#]],
1973 );
1974 }
1975}