1use std::collections::{BTreeMap, HashMap};
16use std::marker::PhantomData;
17use std::ops::Bound;
18use std::ops::Bound::*;
19use std::sync::Arc;
20use std::time::Instant;
21
22use bytes::Bytes;
23use either::Either;
24use foyer::Hint;
25use futures::future::try_join_all;
26use futures::{Stream, StreamExt, TryStreamExt, pin_mut};
27use itertools::Itertools;
28use risingwave_common::array::stream_record::Record;
29use risingwave_common::array::{ArrayImplBuilder, ArrayRef, DataChunk, Op, StreamChunk};
30use risingwave_common::bitmap::Bitmap;
31use risingwave_common::catalog::{
32 ColumnDesc, ColumnId, TableId, TableOption, get_dist_key_in_pk_indices,
33};
34use risingwave_common::config::StreamingConfig;
35use risingwave_common::hash::{VirtualNode, VnodeBitmapExt, VnodeCountCompat};
36use risingwave_common::row::{self, OwnedRow, Row, RowExt};
37use risingwave_common::types::{DataType, ScalarImpl};
38use risingwave_common::util::column_index_mapping::ColIndexMapping;
39use risingwave_common::util::epoch::EpochPair;
40use risingwave_common::util::row_serde::OrderedRowSerde;
41use risingwave_common::util::sort_util::OrderType;
42use risingwave_common::util::value_encoding::BasicSerde;
43use risingwave_hummock_sdk::HummockReadEpoch;
44use risingwave_hummock_sdk::key::{
45 CopyFromSlice, TableKey, end_bound_of_prefix, next_key, prefix_slice_with_vnode,
46 prefixed_range_with_vnode, start_bound_of_excluded_prefix,
47};
48use risingwave_hummock_sdk::table_watermark::{
49 VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
50};
51use risingwave_pb::catalog::Table;
52use risingwave_storage::StateStore;
53use risingwave_storage::error::{ErrorKind, StorageError, StorageResult};
54use risingwave_storage::hummock::CachePolicy;
55use risingwave_storage::mem_table::MemTableError;
56use risingwave_storage::row_serde::find_columns_by_ids;
57use risingwave_storage::row_serde::row_serde_util::{
58 deserialize_pk_with_vnode, serialize_pk, serialize_pk_with_vnode,
59};
60use risingwave_storage::row_serde::value_serde::ValueRowSerde;
61use risingwave_storage::store::*;
62use risingwave_storage::table::{KeyedRow, TableDistribution};
63use thiserror_ext::AsReport;
64use tracing::{Instrument, trace};
65
66use crate::cache::cache_may_stale;
67use crate::executor::StreamExecutorResult;
68
69macro_rules! insane_mode_discard_point {
72 () => {{
73 use rand::Rng;
74 if crate::consistency::insane() && rand::rng().random_bool(0.3) {
75 return;
76 }
77 }};
78}
79
80pub struct StateTableInner<S, SD = BasicSerde, const IS_REPLICATED: bool = false>
83where
84 S: StateStore,
85 SD: ValueRowSerde,
86{
87 table_id: TableId,
89
90 row_store: StateTableRowStore<S::Local, SD>,
92
93 store: S,
95
96 epoch: Option<EpochPair>,
98
99 pk_serde: OrderedRowSerde,
101
102 pk_indices: Vec<usize>,
106
107 distribution: TableDistribution,
113
114 prefix_hint_len: usize,
115
116 value_indices: Option<Vec<usize>>,
117
118 pub clean_watermark_index: Option<usize>,
120 pending_watermark: Option<ScalarImpl>,
122 committed_watermark: Option<ScalarImpl>,
124 watermark_serde: Option<(OrderedRowSerde, WatermarkSerdeType)>,
126
127 data_types: Vec<DataType>,
130
131 i2o_mapping: ColIndexMapping,
137
138 pub output_indices: Vec<usize>,
143
144 op_consistency_level: StateTableOpConsistencyLevel,
145
146 on_post_commit: bool,
149}
150
151pub type StateTable<S> = StateTableInner<S, BasicSerde>;
153pub type ReplicatedStateTable<S, SD> = StateTableInner<S, SD, true>;
156
157impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
159where
160 S: StateStore,
161 SD: ValueRowSerde,
162{
163 pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
166 self.row_store
167 .init(epoch, self.distribution.vnodes())
168 .await?;
169 assert_eq!(None, self.epoch.replace(epoch), "should not init for twice");
170 Ok(())
171 }
172
173 pub async fn try_wait_committed_epoch(&self, prev_epoch: u64) -> StorageResult<()> {
174 self.store
175 .try_wait_epoch(
176 HummockReadEpoch::Committed(prev_epoch),
177 TryWaitEpochOptions {
178 table_id: self.table_id,
179 },
180 )
181 .await
182 }
183
184 pub fn state_store(&self) -> &S {
185 &self.store
186 }
187}
188
189fn consistent_old_value_op(
190 row_serde: Arc<impl ValueRowSerde>,
191 is_log_store: bool,
192) -> OpConsistencyLevel {
193 OpConsistencyLevel::ConsistentOldValue {
194 check_old_value: Arc::new(move |first: &Bytes, second: &Bytes| {
195 if first == second {
196 return true;
197 }
198 let first = match row_serde.deserialize(first) {
199 Ok(rows) => rows,
200 Err(e) => {
201 error!(error = %e.as_report(), value = ?first, "fail to deserialize serialized value");
202 return false;
203 }
204 };
205 let second = match row_serde.deserialize(second) {
206 Ok(rows) => rows,
207 Err(e) => {
208 error!(error = %e.as_report(), value = ?second, "fail to deserialize serialized value");
209 return false;
210 }
211 };
212 if first != second {
213 error!(first = ?first, second = ?second, "sanity check fail");
214 false
215 } else {
216 true
217 }
218 }),
219 is_log_store,
220 }
221}
222
223macro_rules! dispatch_value_indices {
224 ($value_indices:expr, [$($row_var_name:ident),+], $body:expr) => {
225 if let Some(value_indices) = $value_indices {
226 $(
227 let $row_var_name = $row_var_name.project(value_indices);
228 )+
229 $body
230 } else {
231 $body
232 }
233 };
234}
235
236struct StateTableRowStore<LS: LocalStateStore, SD: ValueRowSerde> {
241 state_store: LS,
242 all_rows: Option<HashMap<VirtualNode, BTreeMap<Bytes, OwnedRow>>>,
243
244 table_id: TableId,
245 table_option: TableOption,
246 row_serde: Arc<SD>,
247 pk_serde: OrderedRowSerde,
249}
250
251impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
252 async fn may_reload_all_rows(&mut self, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
253 if let Some(rows) = &mut self.all_rows {
254 rows.clear();
255 let start_time = Instant::now();
256 *rows = try_join_all(vnode_bitmap.iter_vnodes().map(|vnode| {
257 let state_store = &self.state_store;
258 let retention_seconds = self.table_option.retention_seconds;
259 let row_serde = &self.row_serde;
260 async move {
261 let mut rows = BTreeMap::new();
262 let memcomparable_range_with_vnode =
263 prefixed_range_with_vnode::<Bytes>(.., vnode);
264 let stream = deserialize_keyed_row_stream::<Bytes>(
266 state_store
267 .iter(
268 memcomparable_range_with_vnode,
269 ReadOptions {
270 prefix_hint: None,
271 prefetch_options: Default::default(),
272 cache_policy: Default::default(),
273 retention_seconds,
274 },
275 )
276 .await?,
277 &**row_serde,
278 );
279 pin_mut!(stream);
280 while let Some((encoded_key, row)) = stream.try_next().await? {
281 let key = TableKey(encoded_key);
282 let (iter_vnode, key) = key.split_vnode_bytes();
283 assert_eq!(vnode, iter_vnode);
284 rows.try_insert(key, row).expect("non-duplicated");
285 }
286 Ok((vnode, rows)) as StreamExecutorResult<_>
287 }
288 }))
289 .await?
290 .into_iter()
291 .collect();
292 if !cfg!(debug_assertions) {
294 info!(table_id = %self.table_id, vnode_count = vnode_bitmap.count_ones(), duration = ?start_time.elapsed(),"finished reloading all rows");
295 }
296 }
297 Ok(())
298 }
299
300 async fn init(&mut self, epoch: EpochPair, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
301 self.state_store.init(InitOptions::new(epoch)).await?;
302 self.may_reload_all_rows(vnode_bitmap).await
303 }
304
305 async fn update_vnode_bitmap(
306 &mut self,
307 vnodes: Arc<Bitmap>,
308 ) -> StreamExecutorResult<Arc<Bitmap>> {
309 let prev_vnodes = self.state_store.update_vnode_bitmap(vnodes.clone()).await?;
310 self.may_reload_all_rows(&vnodes).await?;
311 Ok(prev_vnodes)
312 }
313
314 async fn try_flush(&mut self) -> StreamExecutorResult<()> {
315 self.state_store.try_flush().await?;
316 Ok(())
317 }
318
319 async fn seal_current_epoch(
320 &mut self,
321 next_epoch: u64,
322 table_watermarks: Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)>,
323 switch_consistent_op: Option<StateTableOpConsistencyLevel>,
324 ) -> StreamExecutorResult<()> {
325 if let Some((direction, watermarks, serde_type)) = &table_watermarks
326 && let Some(rows) = &mut self.all_rows
327 {
328 match serde_type {
329 WatermarkSerdeType::PkPrefix => {
330 for vnode_watermark in watermarks {
331 match direction {
332 WatermarkDirection::Ascending => {
333 for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
334 let rows = rows.get_mut(&vnode).expect("covered vnode");
335 *rows = rows.split_off(vnode_watermark.watermark());
337 }
338 }
339 WatermarkDirection::Descending => {
340 let split_off_key = next_key(vnode_watermark.watermark());
342 for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
343 let rows = rows.get_mut(&vnode).expect("covered vnode");
344 rows.split_off(split_off_key.as_slice());
347 }
348 }
349 }
350 }
351 }
352 WatermarkSerdeType::NonPkPrefix => {
353 warn!(table_id = %self.table_id, "table enabled preloading rows got disabled by written non pk prefix watermark");
354 self.all_rows = None;
355 }
356 }
357 }
358 self.state_store
359 .flush()
360 .instrument(tracing::info_span!("state_table_flush"))
361 .await?;
362 let switch_op_consistency_level =
363 switch_consistent_op.map(|new_consistency_level| match new_consistency_level {
364 StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
365 StateTableOpConsistencyLevel::ConsistentOldValue => {
366 consistent_old_value_op(self.row_serde.clone(), false)
367 }
368 StateTableOpConsistencyLevel::LogStoreEnabled => {
369 consistent_old_value_op(self.row_serde.clone(), true)
370 }
371 });
372 self.state_store.seal_current_epoch(
373 next_epoch,
374 SealCurrentEpochOptions {
375 table_watermarks,
376 switch_op_consistency_level,
377 },
378 );
379 Ok(())
380 }
381}
382
383#[derive(Eq, PartialEq, Copy, Clone, Debug)]
384pub enum StateTableOpConsistencyLevel {
385 Inconsistent,
387 ConsistentOldValue,
391 LogStoreEnabled,
394}
395
396pub struct StateTableBuilder<'a, S, SD, const IS_REPLICATED: bool, PreloadAllRow> {
397 table_catalog: &'a Table,
398 store: S,
399 vnodes: Option<Arc<Bitmap>>,
400 op_consistency_level: Option<StateTableOpConsistencyLevel>,
401 output_column_ids: Option<Vec<ColumnId>>,
402 preload_all_rows: PreloadAllRow,
403
404 _serde: PhantomData<SD>,
405}
406
407impl<'a, S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool>
408 StateTableBuilder<'a, S, SD, IS_REPLICATED, ()>
409{
410 pub fn new(table_catalog: &'a Table, store: S, vnodes: Option<Arc<Bitmap>>) -> Self {
411 Self {
412 table_catalog,
413 store,
414 vnodes,
415 op_consistency_level: None,
416 output_column_ids: None,
417 preload_all_rows: (),
418 _serde: Default::default(),
419 }
420 }
421
422 fn with_preload_all_rows(
423 self,
424 preload_all_rows: bool,
425 ) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, bool> {
426 StateTableBuilder {
427 table_catalog: self.table_catalog,
428 store: self.store,
429 vnodes: self.vnodes,
430 op_consistency_level: self.op_consistency_level,
431 output_column_ids: self.output_column_ids,
432 preload_all_rows,
433 _serde: Default::default(),
434 }
435 }
436
437 pub fn enable_preload_all_rows_by_config(
438 self,
439 config: &StreamingConfig,
440 ) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, bool> {
441 let developer = &config.developer;
442 let preload_all_rows = if developer.default_enable_mem_preload_state_table {
443 !developer
444 .mem_preload_state_table_ids_blacklist
445 .contains(&self.table_catalog.id.as_raw_id())
446 } else {
447 developer
448 .mem_preload_state_table_ids_whitelist
449 .contains(&self.table_catalog.id.as_raw_id())
450 };
451 self.with_preload_all_rows(preload_all_rows)
452 }
453
454 pub fn forbid_preload_all_rows(self) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, bool> {
455 self.with_preload_all_rows(false)
456 }
457}
458
459impl<'a, S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool, PreloadAllRow>
460 StateTableBuilder<'a, S, SD, IS_REPLICATED, PreloadAllRow>
461{
462 pub fn with_op_consistency_level(
463 mut self,
464 op_consistency_level: StateTableOpConsistencyLevel,
465 ) -> Self {
466 self.op_consistency_level = Some(op_consistency_level);
467 self
468 }
469
470 pub fn with_output_column_ids(mut self, output_column_ids: Vec<ColumnId>) -> Self {
471 self.output_column_ids = Some(output_column_ids);
472 self
473 }
474}
475
476impl<'a, S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool>
477 StateTableBuilder<'a, S, SD, IS_REPLICATED, bool>
478{
479 pub async fn build(self) -> StateTableInner<S, SD, IS_REPLICATED> {
480 let mut preload_all_rows = self.preload_all_rows;
481 if preload_all_rows
482 && let Err(e) =
483 risingwave_common::license::Feature::StateTableMemoryPreload.check_available()
484 {
485 warn!(table_id=%self.table_catalog.id, e=%e.as_report(), "table configured to preload rows to memory but disabled by license");
486 preload_all_rows = false;
487 }
488 StateTableInner::from_table_catalog_inner(
489 self.table_catalog,
490 self.store,
491 self.vnodes,
492 self.op_consistency_level
493 .unwrap_or(StateTableOpConsistencyLevel::ConsistentOldValue),
494 self.output_column_ids.unwrap_or_default(),
495 preload_all_rows,
496 )
497 .await
498 }
499}
500
501impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
506where
507 S: StateStore,
508 SD: ValueRowSerde,
509{
510 #[cfg(any(test, feature = "test"))]
514 pub async fn from_table_catalog(
515 table_catalog: &Table,
516 store: S,
517 vnodes: Option<Arc<Bitmap>>,
518 ) -> Self {
519 StateTableBuilder::new(table_catalog, store, vnodes)
520 .forbid_preload_all_rows()
521 .build()
522 .await
523 }
524
525 pub async fn from_table_catalog_inconsistent_op(
527 table_catalog: &Table,
528 store: S,
529 vnodes: Option<Arc<Bitmap>>,
530 ) -> Self {
531 StateTableBuilder::new(table_catalog, store, vnodes)
532 .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
533 .forbid_preload_all_rows()
534 .build()
535 .await
536 }
537
538 async fn from_table_catalog_inner(
540 table_catalog: &Table,
541 store: S,
542 vnodes: Option<Arc<Bitmap>>,
543 op_consistency_level: StateTableOpConsistencyLevel,
544 output_column_ids: Vec<ColumnId>,
545 preload_all_rows: bool,
546 ) -> Self {
547 let table_id = table_catalog.id;
548 let table_columns: Vec<ColumnDesc> = table_catalog
549 .columns
550 .iter()
551 .map(|col| col.column_desc.as_ref().unwrap().into())
552 .collect();
553 let data_types: Vec<DataType> = table_catalog
554 .columns
555 .iter()
556 .map(|col| {
557 col.get_column_desc()
558 .unwrap()
559 .get_column_type()
560 .unwrap()
561 .into()
562 })
563 .collect();
564 let order_types: Vec<OrderType> = table_catalog
565 .pk
566 .iter()
567 .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
568 .collect();
569 let dist_key_indices: Vec<usize> = table_catalog
570 .distribution_key
571 .iter()
572 .map(|dist_index| *dist_index as usize)
573 .collect();
574
575 let pk_indices = table_catalog
576 .pk
577 .iter()
578 .map(|col_order| col_order.column_index as usize)
579 .collect_vec();
580
581 let dist_key_in_pk_indices = if table_catalog.get_dist_key_in_pk().is_empty() {
583 get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices).unwrap()
584 } else {
585 table_catalog
586 .get_dist_key_in_pk()
587 .iter()
588 .map(|idx| *idx as usize)
589 .collect()
590 };
591
592 let vnode_col_idx_in_pk = table_catalog.vnode_col_index.as_ref().and_then(|idx| {
593 let vnode_col_idx = *idx as usize;
594 pk_indices.iter().position(|&i| vnode_col_idx == i)
595 });
596
597 let distribution =
598 TableDistribution::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk);
599 assert_eq!(
600 distribution.vnode_count(),
601 table_catalog.vnode_count(),
602 "vnode count mismatch, scanning table {} under wrong distribution?",
603 table_catalog.name,
604 );
605
606 let pk_data_types = pk_indices
607 .iter()
608 .map(|i| table_columns[*i].data_type.clone())
609 .collect();
610 let pk_serde = OrderedRowSerde::new(pk_data_types, order_types);
611
612 let input_value_indices = table_catalog
613 .value_indices
614 .iter()
615 .map(|val| *val as usize)
616 .collect_vec();
617
618 let no_shuffle_value_indices = (0..table_columns.len()).collect_vec();
619
620 let value_indices = match input_value_indices.len() == table_columns.len()
622 && input_value_indices == no_shuffle_value_indices
623 {
624 true => None,
625 false => Some(input_value_indices),
626 };
627 let prefix_hint_len = table_catalog.read_prefix_len_hint as usize;
628
629 let row_serde = Arc::new(SD::new(
630 Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
631 Arc::from(table_columns.clone().into_boxed_slice()),
632 ));
633
634 let state_table_op_consistency_level = op_consistency_level;
635 let op_consistency_level = match op_consistency_level {
636 StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
637 StateTableOpConsistencyLevel::ConsistentOldValue => {
638 consistent_old_value_op(row_serde.clone(), false)
639 }
640 StateTableOpConsistencyLevel::LogStoreEnabled => {
641 consistent_old_value_op(row_serde.clone(), true)
642 }
643 };
644
645 let table_option = TableOption::new(table_catalog.retention_seconds);
646 let new_local_options = if IS_REPLICATED {
647 NewLocalOptions::new_replicated(
648 table_id,
649 op_consistency_level,
650 table_option,
651 distribution.vnodes().clone(),
652 )
653 } else {
654 NewLocalOptions::new(
655 table_id,
656 op_consistency_level,
657 table_option,
658 distribution.vnodes().clone(),
659 true,
660 )
661 };
662 let local_state_store = store.new_local(new_local_options).await;
663
664 assert_eq!(
670 table_catalog.version.is_some(),
671 row_serde.kind().is_column_aware()
672 );
673
674 let output_column_ids_to_input_idx = output_column_ids
676 .iter()
677 .enumerate()
678 .map(|(pos, id)| (*id, pos))
679 .collect::<HashMap<_, _>>();
680
681 let columns: Vec<ColumnDesc> = table_catalog
683 .columns
684 .iter()
685 .map(|c| c.column_desc.as_ref().unwrap().into())
686 .collect_vec();
687
688 let mut i2o_mapping = vec![None; columns.len()];
692 for (i, column) in columns.iter().enumerate() {
693 if let Some(pos) = output_column_ids_to_input_idx.get(&column.column_id) {
694 i2o_mapping[i] = Some(*pos);
695 }
696 }
697 let i2o_mapping = ColIndexMapping::new(i2o_mapping, output_column_ids.len());
699
700 let (_, output_indices) = find_columns_by_ids(&columns[..], &output_column_ids);
702
703 let clean_watermark_indices = table_catalog.get_clean_watermark_column_indices();
704 if clean_watermark_indices.len() > 1 {
705 unimplemented!("multiple clean watermark columns are not supported yet")
706 }
707 let clean_watermark_index = clean_watermark_indices.first().map(|&i| i as usize);
708
709 let watermark_serde = clean_watermark_index.map(|idx| {
710 let pk_idx = pk_indices.iter().position(|&i| i == idx);
711 let (watermark_serde, watermark_serde_type) = match pk_idx {
712 Some(0) => (pk_serde.index(0).into_owned(), WatermarkSerdeType::PkPrefix),
713 Some(pk_idx) => (
714 pk_serde.index(pk_idx).into_owned(),
715 WatermarkSerdeType::NonPkPrefix,
716 ),
717 None => (
718 OrderedRowSerde::new(
719 vec![data_types[idx].clone()],
720 vec![OrderType::ascending()],
721 ),
722 WatermarkSerdeType::NonPkPrefix,
724 ),
725 };
726 (watermark_serde, watermark_serde_type)
727 });
728
729 let max_watermark_of_vnodes = distribution
734 .vnodes()
735 .iter_vnodes()
736 .filter_map(|vnode| local_state_store.get_table_watermark(vnode))
737 .max();
738 let committed_watermark = if let Some((deser, WatermarkSerdeType::PkPrefix)) =
739 watermark_serde.as_ref()
740 && let Some(max_watermark) = max_watermark_of_vnodes
741 {
742 let deserialized = deser.deserialize(&max_watermark).ok().and_then(|row| {
743 assert!(row.len() == 1);
744 row[0].clone()
745 });
746 if deserialized.is_none() {
747 tracing::error!(
748 vnodes = ?distribution.vnodes(),
749 watermark = ?max_watermark,
750 "Failed to deserialize persisted watermark from state store.",
751 );
752 }
753 deserialized
754 } else {
755 None
756 };
757
758 Self {
759 table_id,
760 row_store: StateTableRowStore {
761 all_rows: preload_all_rows.then(HashMap::new),
762 table_option,
763 state_store: local_state_store,
764 row_serde,
765 pk_serde: pk_serde.clone(),
766 table_id,
767 },
768 store,
769 epoch: None,
770 pk_serde,
771 pk_indices,
772 distribution,
773 prefix_hint_len,
774 value_indices,
775 pending_watermark: None,
776 committed_watermark,
777 watermark_serde,
778 data_types,
779 output_indices,
780 i2o_mapping,
781 op_consistency_level: state_table_op_consistency_level,
782 clean_watermark_index,
783 on_post_commit: false,
784 }
785 }
786
787 pub fn get_data_types(&self) -> &[DataType] {
788 &self.data_types
789 }
790
791 pub fn table_id(&self) -> TableId {
792 self.table_id
793 }
794
795 fn compute_prefix_vnode(&self, pk_prefix: &impl Row) -> VirtualNode {
797 self.distribution
798 .try_compute_vnode_by_pk_prefix(pk_prefix)
799 .expect("For streaming, the given prefix must be enough to calculate the vnode")
800 }
801
802 pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode {
804 self.distribution.compute_vnode_by_pk(pk)
805 }
806
807 pub fn pk_indices(&self) -> &[usize] {
810 &self.pk_indices
811 }
812
813 pub fn pk_in_output_indices(&self) -> Option<Vec<usize>> {
817 assert!(IS_REPLICATED);
818 self.pk_indices
819 .iter()
820 .map(|&i| self.output_indices.iter().position(|&j| i == j))
821 .collect()
822 }
823
824 pub fn pk_serde(&self) -> &OrderedRowSerde {
825 &self.pk_serde
826 }
827
828 pub fn vnodes(&self) -> &Arc<Bitmap> {
829 self.distribution.vnodes()
830 }
831
832 pub fn value_indices(&self) -> &Option<Vec<usize>> {
833 &self.value_indices
834 }
835
836 pub fn is_consistent_op(&self) -> bool {
837 matches!(
838 self.op_consistency_level,
839 StateTableOpConsistencyLevel::ConsistentOldValue
840 | StateTableOpConsistencyLevel::LogStoreEnabled
841 )
842 }
843}
844
845impl<S, SD> StateTableInner<S, SD, true>
846where
847 S: StateStore,
848 SD: ValueRowSerde,
849{
850 pub async fn new_replicated(
852 table_catalog: &Table,
853 store: S,
854 vnodes: Option<Arc<Bitmap>>,
855 output_column_ids: Vec<ColumnId>,
856 ) -> Self {
857 StateTableBuilder::new(table_catalog, store, vnodes)
860 .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
861 .with_output_column_ids(output_column_ids)
862 .forbid_preload_all_rows()
863 .build()
864 .await
865 }
866}
867
868impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
870where
871 S: StateStore,
872 SD: ValueRowSerde,
873{
874 pub async fn get_row(&self, pk: impl Row) -> StreamExecutorResult<Option<OwnedRow>> {
876 let (serialized_pk, prefix_hint) = self.serialize_pk_and_get_prefix_hint(&pk);
877 let row = self.row_store.get(serialized_pk, prefix_hint).await?;
878 match row {
879 Some(row) => {
880 if IS_REPLICATED {
881 let row = row.project(&self.output_indices);
884 Ok(Some(row.into_owned_row()))
885 } else {
886 Ok(Some(row))
887 }
888 }
889 None => Ok(None),
890 }
891 }
892
893 pub async fn exists(&self, pk: impl Row) -> StreamExecutorResult<bool> {
895 let (serialized_pk, prefix_hint) = self.serialize_pk_and_get_prefix_hint(&pk);
896 self.row_store.exists(serialized_pk, prefix_hint).await
897 }
898
899 fn serialize_pk(&self, pk: &impl Row) -> TableKey<Bytes> {
900 assert!(pk.len() <= self.pk_indices.len());
901 serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_vnode_by_pk(pk))
902 }
903
904 fn serialize_pk_and_get_prefix_hint(&self, pk: &impl Row) -> (TableKey<Bytes>, Option<Bytes>) {
905 let serialized_pk = self.serialize_pk(&pk);
906 let prefix_hint = if self.prefix_hint_len != 0 && self.prefix_hint_len == pk.len() {
907 Some(serialized_pk.slice(VirtualNode::SIZE..))
908 } else {
909 #[cfg(debug_assertions)]
910 if self.prefix_hint_len != 0 {
911 warn!(
912 "prefix_hint_len is not equal to pk.len(), may not be able to utilize bloom filter"
913 );
914 }
915 None
916 };
917 (serialized_pk, prefix_hint)
918 }
919}
920
921impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
922 async fn get(
923 &self,
924 key_bytes: TableKey<Bytes>,
925 prefix_hint: Option<Bytes>,
926 ) -> StreamExecutorResult<Option<OwnedRow>> {
927 if let Some(rows) = &self.all_rows {
928 let (vnode, key) = key_bytes.split_vnode();
929 return Ok(rows.get(&vnode).expect("covered vnode").get(key).cloned());
930 }
931 let read_options = ReadOptions {
932 prefix_hint,
933 retention_seconds: self.table_option.retention_seconds,
934 cache_policy: CachePolicy::Fill(Hint::Normal),
935 ..Default::default()
936 };
937
938 self.state_store
939 .on_key_value(key_bytes, read_options, move |_, value| {
940 let row = self.row_serde.deserialize(value)?;
941 Ok(OwnedRow::new(row))
942 })
943 .await
944 .map_err(Into::into)
945 }
946
947 async fn exists(
948 &self,
949 key_bytes: TableKey<Bytes>,
950 prefix_hint: Option<Bytes>,
951 ) -> StreamExecutorResult<bool> {
952 if let Some(rows) = &self.all_rows {
953 let (vnode, key) = key_bytes.split_vnode();
954 return Ok(rows.get(&vnode).expect("covered vnode").contains_key(key));
955 }
956 let read_options = ReadOptions {
957 prefix_hint,
958 retention_seconds: self.table_option.retention_seconds,
959 cache_policy: CachePolicy::Fill(Hint::Normal),
960 ..Default::default()
961 };
962 let result = self
963 .state_store
964 .on_key_value(key_bytes, read_options, move |_, _| Ok(()))
965 .await?;
966 Ok(result.is_some())
967 }
968}
969
970#[must_use]
985pub struct StateTablePostCommit<'a, S, SD = BasicSerde, const IS_REPLICATED: bool = false>
986where
987 S: StateStore,
988 SD: ValueRowSerde,
989{
990 inner: &'a mut StateTableInner<S, SD, IS_REPLICATED>,
991}
992
993impl<'a, S, SD, const IS_REPLICATED: bool> StateTablePostCommit<'a, S, SD, IS_REPLICATED>
994where
995 S: StateStore,
996 SD: ValueRowSerde,
997{
998 pub async fn post_yield_barrier(
999 mut self,
1000 new_vnodes: Option<Arc<Bitmap>>,
1001 ) -> StreamExecutorResult<
1002 Option<(
1003 (
1004 Arc<Bitmap>,
1005 Arc<Bitmap>,
1006 &'a mut StateTableInner<S, SD, IS_REPLICATED>,
1007 ),
1008 bool,
1009 )>,
1010 > {
1011 self.inner.on_post_commit = false;
1012 Ok(if let Some(new_vnodes) = new_vnodes {
1013 let (old_vnodes, cache_may_stale) =
1014 self.update_vnode_bitmap(new_vnodes.clone()).await?;
1015 Some(((new_vnodes, old_vnodes, self.inner), cache_may_stale))
1016 } else {
1017 None
1018 })
1019 }
1020
1021 pub fn inner(&self) -> &StateTableInner<S, SD, IS_REPLICATED> {
1022 &*self.inner
1023 }
1024
1025 async fn update_vnode_bitmap(
1027 &mut self,
1028 new_vnodes: Arc<Bitmap>,
1029 ) -> StreamExecutorResult<(Arc<Bitmap>, bool)> {
1030 let prev_vnodes = self
1031 .inner
1032 .row_store
1033 .update_vnode_bitmap(new_vnodes.clone())
1034 .await?;
1035 assert_eq!(
1036 &prev_vnodes,
1037 self.inner.vnodes(),
1038 "state table and state store vnode bitmap mismatches"
1039 );
1040
1041 if self.inner.distribution.is_singleton() {
1042 assert_eq!(
1043 &new_vnodes,
1044 self.inner.vnodes(),
1045 "should not update vnode bitmap for singleton table"
1046 );
1047 }
1048 assert_eq!(self.inner.vnodes().len(), new_vnodes.len());
1049
1050 let cache_may_stale = cache_may_stale(self.inner.vnodes(), &new_vnodes);
1051
1052 if cache_may_stale {
1053 self.inner.pending_watermark = None;
1054 }
1055
1056 Ok((
1057 self.inner.distribution.update_vnode_bitmap(new_vnodes),
1058 cache_may_stale,
1059 ))
1060 }
1061}
1062
1063impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1065 fn handle_mem_table_error(&self, e: StorageError) {
1066 let e = match e.into_inner() {
1067 ErrorKind::MemTable(e) => e,
1068 _ => unreachable!("should only get memtable error"),
1069 };
1070 match *e {
1071 MemTableError::InconsistentOperation { key, prev, new, .. } => {
1072 let (vnode, key) = deserialize_pk_with_vnode(&key, &self.pk_serde).unwrap();
1073 panic!(
1074 "mem-table operation inconsistent! table_id: {}, vnode: {}, key: {:?}, prev: {}, new: {}",
1075 self.table_id,
1076 vnode,
1077 &key,
1078 prev.debug_fmt(&*self.row_serde),
1079 new.debug_fmt(&*self.row_serde),
1080 )
1081 }
1082 }
1083 }
1084
1085 fn insert(&mut self, key: TableKey<Bytes>, value: impl Row) {
1086 insane_mode_discard_point!();
1087 let value_bytes = self.row_serde.serialize(&value).into();
1088 if let Some(rows) = &mut self.all_rows {
1089 let (vnode, key) = key.split_vnode_bytes();
1090 rows.get_mut(&vnode)
1091 .expect("covered vnode")
1092 .insert(key, value.into_owned_row());
1093 }
1094 self.state_store
1095 .insert(key, value_bytes, None)
1096 .unwrap_or_else(|e| self.handle_mem_table_error(e));
1097 }
1098
1099 fn delete(&mut self, key: TableKey<Bytes>, value: impl Row) {
1100 insane_mode_discard_point!();
1101 let value_bytes = self.row_serde.serialize(value).into();
1102 if let Some(rows) = &mut self.all_rows {
1103 let (vnode, key) = key.split_vnode();
1104 rows.get_mut(&vnode).expect("covered vnode").remove(key);
1105 }
1106 self.state_store
1107 .delete(key, value_bytes)
1108 .unwrap_or_else(|e| self.handle_mem_table_error(e));
1109 }
1110
1111 fn update(&mut self, key_bytes: TableKey<Bytes>, old_value: impl Row, new_value: impl Row) {
1112 insane_mode_discard_point!();
1113 let new_value_bytes = self.row_serde.serialize(&new_value).into();
1114 let old_value_bytes = self.row_serde.serialize(old_value).into();
1115 if let Some(rows) = &mut self.all_rows {
1116 let (vnode, key) = key_bytes.split_vnode_bytes();
1117 rows.get_mut(&vnode)
1118 .expect("covered vnode")
1119 .insert(key, new_value.into_owned_row());
1120 }
1121 self.state_store
1122 .insert(key_bytes, new_value_bytes, Some(old_value_bytes))
1123 .unwrap_or_else(|e| self.handle_mem_table_error(e));
1124 }
1125}
1126
1127impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1128where
1129 S: StateStore,
1130 SD: ValueRowSerde,
1131{
1132 pub fn insert(&mut self, value: impl Row) {
1135 let pk_indices = &self.pk_indices;
1136 let pk = (&value).project(pk_indices);
1137
1138 let key_bytes = self.serialize_pk(&pk);
1139 dispatch_value_indices!(&self.value_indices, [value], {
1140 self.row_store.insert(key_bytes, value)
1141 })
1142 }
1143
1144 pub fn delete(&mut self, old_value: impl Row) {
1147 let pk_indices = &self.pk_indices;
1148 let pk = (&old_value).project(pk_indices);
1149
1150 let key_bytes = self.serialize_pk(&pk);
1151 dispatch_value_indices!(&self.value_indices, [old_value], {
1152 self.row_store.delete(key_bytes, old_value)
1153 })
1154 }
1155
1156 pub fn update(&mut self, old_value: impl Row, new_value: impl Row) {
1158 let old_pk = (&old_value).project(self.pk_indices());
1159 let new_pk = (&new_value).project(self.pk_indices());
1160 debug_assert!(
1161 Row::eq(&old_pk, new_pk),
1162 "pk should not change: {old_pk:?} vs {new_pk:?}. {}",
1163 self.table_id
1164 );
1165
1166 let key_bytes = self.serialize_pk(&new_pk);
1167 dispatch_value_indices!(&self.value_indices, [old_value, new_value], {
1168 self.row_store.update(key_bytes, old_value, new_value)
1169 })
1170 }
1171
1172 pub fn write_record(&mut self, record: Record<impl Row>) {
1174 match record {
1175 Record::Insert { new_row } => self.insert(new_row),
1176 Record::Delete { old_row } => self.delete(old_row),
1177 Record::Update { old_row, new_row } => self.update(old_row, new_row),
1178 }
1179 }
1180
1181 fn fill_non_output_indices(&self, chunk: StreamChunk) -> StreamChunk {
1182 fill_non_output_indices(&self.i2o_mapping, &self.data_types, chunk)
1183 }
1184
1185 #[allow(clippy::disallowed_methods)]
1188 pub fn write_chunk(&mut self, chunk: StreamChunk) {
1189 let chunk = if IS_REPLICATED {
1190 self.fill_non_output_indices(chunk)
1191 } else {
1192 chunk
1193 };
1194
1195 let vnodes = self
1196 .distribution
1197 .compute_chunk_vnode(&chunk, &self.pk_indices);
1198
1199 for (idx, optional_row) in chunk.rows_with_holes().enumerate() {
1200 let Some((op, row)) = optional_row else {
1201 continue;
1202 };
1203 let pk = row.project(&self.pk_indices);
1204 let vnode = vnodes[idx];
1205 let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, vnode);
1206 match op {
1207 Op::Insert | Op::UpdateInsert => {
1208 dispatch_value_indices!(&self.value_indices, [row], {
1209 self.row_store.insert(key_bytes, row);
1210 });
1211 }
1212 Op::Delete | Op::UpdateDelete => {
1213 dispatch_value_indices!(&self.value_indices, [row], {
1214 self.row_store.delete(key_bytes, row);
1215 });
1216 }
1217 }
1218 }
1219 }
1220
1221 pub fn update_watermark(&mut self, watermark: ScalarImpl) {
1227 trace!(table_id = %self.table_id, watermark = ?watermark, "update watermark");
1228 self.pending_watermark = Some(watermark);
1229 }
1230
1231 pub fn get_committed_watermark(&self) -> Option<&ScalarImpl> {
1234 self.committed_watermark.as_ref()
1235 }
1236
1237 pub fn cleaned_by_watermark(&self) -> bool {
1239 self.clean_watermark_index.is_some()
1240 }
1241
1242 pub async fn commit(
1243 &mut self,
1244 new_epoch: EpochPair,
1245 ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED>> {
1246 self.commit_inner(new_epoch, None).await
1247 }
1248
1249 #[cfg(test)]
1250 pub async fn commit_for_test(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
1251 self.commit_assert_no_update_vnode_bitmap(new_epoch).await
1252 }
1253
1254 pub async fn commit_assert_no_update_vnode_bitmap(
1255 &mut self,
1256 new_epoch: EpochPair,
1257 ) -> StreamExecutorResult<()> {
1258 let post_commit = self.commit_inner(new_epoch, None).await?;
1259 post_commit.post_yield_barrier(None).await?;
1260 Ok(())
1261 }
1262
1263 pub async fn commit_may_switch_consistent_op(
1264 &mut self,
1265 new_epoch: EpochPair,
1266 op_consistency_level: StateTableOpConsistencyLevel,
1267 ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED>> {
1268 if self.op_consistency_level != op_consistency_level {
1269 if !cfg!(debug_assertions) {
1271 info!(
1272 ?new_epoch,
1273 prev_op_consistency_level = ?self.op_consistency_level,
1274 ?op_consistency_level,
1275 table_id = %self.table_id,
1276 "switch to new op consistency level"
1277 );
1278 }
1279 self.commit_inner(new_epoch, Some(op_consistency_level))
1280 .await
1281 } else {
1282 self.commit_inner(new_epoch, None).await
1283 }
1284 }
1285
1286 async fn commit_inner(
1287 &mut self,
1288 new_epoch: EpochPair,
1289 switch_consistent_op: Option<StateTableOpConsistencyLevel>,
1290 ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED>> {
1291 assert!(!self.on_post_commit);
1292 assert_eq!(
1293 self.epoch.expect("should only be called after init").curr,
1294 new_epoch.prev
1295 );
1296 if let Some(new_consistency_level) = switch_consistent_op {
1297 assert_ne!(self.op_consistency_level, new_consistency_level);
1298 self.op_consistency_level = new_consistency_level;
1299 }
1300 trace!(
1301 table_id = %self.table_id,
1302 epoch = ?self.epoch,
1303 "commit state table"
1304 );
1305
1306 let table_watermarks = self.commit_pending_watermark();
1307 self.row_store
1308 .seal_current_epoch(new_epoch.curr, table_watermarks, switch_consistent_op)
1309 .await?;
1310 self.epoch = Some(new_epoch);
1311
1312 self.on_post_commit = true;
1313 Ok(StateTablePostCommit { inner: self })
1314 }
1315
1316 fn commit_pending_watermark(
1318 &mut self,
1319 ) -> Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)> {
1320 let watermark = self.pending_watermark.take()?;
1321 trace!(table_id = %self.table_id, watermark = ?watermark, "state cleaning");
1322
1323 assert!(
1324 !self.pk_indices().is_empty(),
1325 "see pending watermark on empty pk"
1326 );
1327 let (watermark_serializer, watermark_type) = self
1328 .watermark_serde
1329 .as_ref()
1330 .expect("watermark serde should be initialized to commit watermark");
1331
1332 let watermark_suffix =
1333 serialize_pk(row::once(Some(watermark.clone())), watermark_serializer);
1334 let vnode_watermark = VnodeWatermark::new(
1335 self.vnodes().clone(),
1336 Bytes::copy_from_slice(watermark_suffix.as_ref()),
1337 );
1338
1339 trace!(table_id = %self.table_id, ?vnode_watermark, "table watermark");
1340
1341 let order_type = watermark_serializer.get_order_types().get(0).unwrap();
1342 let direction = if order_type.is_ascending() {
1343 WatermarkDirection::Ascending
1344 } else {
1345 WatermarkDirection::Descending
1346 };
1347
1348 self.committed_watermark = Some(watermark);
1349 Some((direction, vec![vnode_watermark], *watermark_type))
1350 }
1351
1352 pub async fn try_flush(&mut self) -> StreamExecutorResult<()> {
1353 self.row_store.try_flush().await?;
1354 Ok(())
1355 }
1356}
1357
1358pub trait RowStream<'a>: Stream<Item = StreamExecutorResult<OwnedRow>> + 'a {}
1360impl<'a, S: Stream<Item = StreamExecutorResult<OwnedRow>> + 'a> RowStream<'a> for S {}
1361
1362pub trait KeyedRowStream<'a>: Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a {}
1363impl<'a, S: Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a> KeyedRowStream<'a> for S {}
1364
1365pub trait PkRowStream<'a, K>: Stream<Item = StreamExecutorResult<(K, OwnedRow)>> + 'a {}
1366impl<'a, K, S: Stream<Item = StreamExecutorResult<(K, OwnedRow)>> + 'a> PkRowStream<'a, K> for S {}
1367
1368pub trait FromVnodeBytes {
1369 fn from_vnode_bytes(vnode: VirtualNode, bytes: &Bytes) -> Self;
1370}
1371
1372impl FromVnodeBytes for Bytes {
1373 fn from_vnode_bytes(vnode: VirtualNode, bytes: &Bytes) -> Self {
1374 prefix_slice_with_vnode(vnode, bytes)
1375 }
1376}
1377
1378impl FromVnodeBytes for () {
1379 fn from_vnode_bytes(_vnode: VirtualNode, _bytes: &Bytes) -> Self {}
1380}
1381
1382impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1384where
1385 S: StateStore,
1386 SD: ValueRowSerde,
1387{
1388 pub async fn iter_with_vnode(
1391 &self,
1392
1393 vnode: VirtualNode,
1397 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1398 prefetch_options: PrefetchOptions,
1399 ) -> StreamExecutorResult<impl RowStream<'_>> {
1400 Ok(self
1401 .iter_kv_with_pk_range::<()>(pk_range, vnode, prefetch_options)
1402 .await?
1403 .map_ok(|(_, row)| row))
1404 }
1405
1406 pub async fn iter_keyed_row_with_vnode(
1407 &self,
1408 vnode: VirtualNode,
1409 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1410 prefetch_options: PrefetchOptions,
1411 ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1412 Ok(self
1413 .iter_kv_with_pk_range(pk_range, vnode, prefetch_options)
1414 .await?
1415 .map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)))
1416 }
1417
1418 pub async fn iter_with_vnode_and_output_indices(
1419 &self,
1420 vnode: VirtualNode,
1421 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1422 prefetch_options: PrefetchOptions,
1423 ) -> StreamExecutorResult<impl RowStream<'_>> {
1424 assert!(IS_REPLICATED);
1425 let stream = self
1426 .iter_with_vnode(vnode, pk_range, prefetch_options)
1427 .await?;
1428 Ok(stream.map(|row| row.map(|row| row.project(&self.output_indices).into_owned_row())))
1429 }
1430}
1431
1432impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1433 async fn iter_kv<K: CopyFromSlice + FromVnodeBytes>(
1438 &self,
1439 vnode: VirtualNode,
1440 (start, end): (Bound<Bytes>, Bound<Bytes>),
1441 prefix_hint: Option<Bytes>,
1442 prefetch_options: PrefetchOptions,
1443 ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1444 if let Some(rows) = &self.all_rows {
1445 return Ok(futures::future::Either::Left(futures::stream::iter(
1446 rows.get(&vnode)
1447 .expect("covered vnode")
1448 .range((start, end))
1449 .map(move |(key, value)| Ok((K::from_vnode_bytes(vnode, key), value.clone()))),
1450 )));
1451 }
1452 let read_options = ReadOptions {
1453 prefix_hint,
1454 retention_seconds: self.table_option.retention_seconds,
1455 prefetch_options,
1456 cache_policy: CachePolicy::Fill(Hint::Normal),
1457 };
1458
1459 Ok(futures::future::Either::Right(
1460 deserialize_keyed_row_stream(
1461 self.state_store
1462 .iter(prefixed_range_with_vnode((start, end), vnode), read_options)
1463 .await?,
1464 &*self.row_serde,
1465 ),
1466 ))
1467 }
1468
1469 async fn rev_iter_kv<K: CopyFromSlice + FromVnodeBytes>(
1470 &self,
1471 vnode: VirtualNode,
1472 (start, end): (Bound<Bytes>, Bound<Bytes>),
1473 prefix_hint: Option<Bytes>,
1474 prefetch_options: PrefetchOptions,
1475 ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1476 if let Some(rows) = &self.all_rows {
1477 return Ok(futures::future::Either::Left(futures::stream::iter(
1478 rows.get(&vnode)
1479 .expect("covered vnode")
1480 .range((start, end))
1481 .rev()
1482 .map(move |(key, value)| Ok((K::from_vnode_bytes(vnode, key), value.clone()))),
1483 )));
1484 }
1485 let read_options = ReadOptions {
1486 prefix_hint,
1487 retention_seconds: self.table_option.retention_seconds,
1488 prefetch_options,
1489 cache_policy: CachePolicy::Fill(Hint::Normal),
1490 };
1491
1492 Ok(futures::future::Either::Right(
1493 deserialize_keyed_row_stream(
1494 self.state_store
1495 .rev_iter(prefixed_range_with_vnode((start, end), vnode), read_options)
1496 .await?,
1497 &*self.row_serde,
1498 ),
1499 ))
1500 }
1501}
1502
1503impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1504where
1505 S: StateStore,
1506 SD: ValueRowSerde,
1507{
1508 pub async fn iter_with_prefix(
1512 &self,
1513 pk_prefix: impl Row,
1514 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1515 prefetch_options: PrefetchOptions,
1516 ) -> StreamExecutorResult<impl RowStream<'_>> {
1517 let stream = self.iter_with_prefix_inner::<false, ()>(pk_prefix, sub_range, prefetch_options)
1518 .await?;
1519 Ok(stream.map_ok(|(_, row)| row))
1520 }
1521
1522 pub async fn get_from_one_row_table(&self) -> StreamExecutorResult<Option<OwnedRow>> {
1524 let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
1525 let stream = self
1526 .iter_with_prefix(row::empty(), sub_range, Default::default())
1527 .await?;
1528 pin_mut!(stream);
1529
1530 if let Some(res) = stream.next().await {
1531 let value = res?.into_owned_row();
1532 assert!(stream.next().await.is_none());
1533 Ok(Some(value))
1534 } else {
1535 Ok(None)
1536 }
1537 }
1538
1539 pub async fn get_from_one_value_table(&self) -> StreamExecutorResult<Option<ScalarImpl>> {
1544 Ok(self
1545 .get_from_one_row_table()
1546 .await?
1547 .and_then(|row| row[0].clone()))
1548 }
1549
1550 pub async fn iter_keyed_row_with_prefix(
1551 &self,
1552 pk_prefix: impl Row,
1553 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1554 prefetch_options: PrefetchOptions,
1555 ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1556 Ok(
1557 self.iter_with_prefix_inner::<false, Bytes>(pk_prefix, sub_range, prefetch_options)
1558 .await?.map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)),
1559 )
1560 }
1561
1562 pub async fn rev_iter_with_prefix(
1564 &self,
1565 pk_prefix: impl Row,
1566 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1567 prefetch_options: PrefetchOptions,
1568 ) -> StreamExecutorResult<impl RowStream<'_>> {
1569 Ok(
1570 self.iter_with_prefix_inner::<true, ()>(pk_prefix, sub_range, prefetch_options)
1571 .await?.map_ok(|(_, row)| row),
1572 )
1573 }
1574
1575 async fn iter_with_prefix_inner<const REVERSE: bool, K: CopyFromSlice + FromVnodeBytes>(
1576 &self,
1577 pk_prefix: impl Row,
1578 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1579 prefetch_options: PrefetchOptions,
1580 ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1581 let prefix_serializer = self.pk_serde.prefix(pk_prefix.len());
1582 let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
1583
1584 let vnode = self.compute_prefix_vnode(&pk_prefix);
1588
1589 let pk_prefix_indices = &self.pk_indices[..pk_prefix.len()];
1591 if self.prefix_hint_len != 0 {
1592 debug_assert_eq!(self.prefix_hint_len, pk_prefix.len());
1593 }
1594 let prefix_hint = {
1595 if self.prefix_hint_len == 0 || self.prefix_hint_len > pk_prefix.len() {
1596 None
1597 } else {
1598 let encoded_prefix_len = self
1599 .pk_serde
1600 .deserialize_prefix_len(&encoded_prefix, self.prefix_hint_len)?;
1601
1602 Some(Bytes::copy_from_slice(
1603 &encoded_prefix[..encoded_prefix_len],
1604 ))
1605 }
1606 };
1607
1608 trace!(
1609 table_id = %self.table_id(),
1610 ?prefix_hint, ?pk_prefix,
1611 ?pk_prefix_indices,
1612 iter_direction = if REVERSE { "reverse" } else { "forward" },
1613 "storage_iter_with_prefix"
1614 );
1615
1616 let memcomparable_range =
1617 prefix_and_sub_range_to_memcomparable(&self.pk_serde, sub_range, pk_prefix);
1618
1619 Ok(if REVERSE {
1620 futures::future::Either::Left(
1621 self.row_store
1622 .rev_iter_kv(vnode, memcomparable_range, prefix_hint, prefetch_options)
1623 .await?,
1624 )
1625 } else {
1626 futures::future::Either::Right(
1627 self.row_store
1628 .iter_kv(vnode, memcomparable_range, prefix_hint, prefetch_options)
1629 .await?,
1630 )
1631 })
1632 }
1633
1634 async fn iter_kv_with_pk_range<'a, K: CopyFromSlice + FromVnodeBytes>(
1637 &'a self,
1638 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1639 vnode: VirtualNode,
1643 prefetch_options: PrefetchOptions,
1644 ) -> StreamExecutorResult<impl PkRowStream<'a, K>> {
1645 let memcomparable_range = prefix_range_to_memcomparable(&self.pk_serde, pk_range);
1646
1647 self.row_store
1649 .iter_kv(vnode, memcomparable_range, None, prefetch_options)
1650 .await
1651 }
1652}
1653
1654fn deserialize_keyed_row_stream<'a, K: CopyFromSlice>(
1655 iter: impl StateStoreIter + 'a,
1656 deserializer: &'a impl ValueRowSerde,
1657) -> impl PkRowStream<'a, K> {
1658 iter.into_stream(move |(key, value)| {
1659 Ok((
1660 K::copy_from_slice(key.user_key.table_key.as_ref()),
1661 deserializer.deserialize(value).map(OwnedRow::new)?,
1662 ))
1663 })
1664 .map_err(Into::into)
1665}
1666
1667pub fn prefix_range_to_memcomparable(
1668 pk_serde: &OrderedRowSerde,
1669 range: &(Bound<impl Row>, Bound<impl Row>),
1670) -> (Bound<Bytes>, Bound<Bytes>) {
1671 (
1672 start_range_to_memcomparable(pk_serde, &range.0),
1673 end_range_to_memcomparable(pk_serde, &range.1, None),
1674 )
1675}
1676
1677fn prefix_and_sub_range_to_memcomparable(
1678 pk_serde: &OrderedRowSerde,
1679 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1680 pk_prefix: impl Row,
1681) -> (Bound<Bytes>, Bound<Bytes>) {
1682 let (range_start, range_end) = sub_range;
1683 let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1684 let serialized_pk_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
1685 let start_range = match range_start {
1686 Included(start_range) => Bound::Included(Either::Left((&pk_prefix).chain(start_range))),
1687 Excluded(start_range) => Bound::Excluded(Either::Left((&pk_prefix).chain(start_range))),
1688 Unbounded => Bound::Included(Either::Right(&pk_prefix)),
1689 };
1690 let end_range = match range_end {
1691 Included(end_range) => Bound::Included((&pk_prefix).chain(end_range)),
1692 Excluded(end_range) => Bound::Excluded((&pk_prefix).chain(end_range)),
1693 Unbounded => Unbounded,
1694 };
1695 (
1696 start_range_to_memcomparable(pk_serde, &start_range),
1697 end_range_to_memcomparable(pk_serde, &end_range, Some(serialized_pk_prefix)),
1698 )
1699}
1700
1701fn start_range_to_memcomparable<R: Row>(
1702 pk_serde: &OrderedRowSerde,
1703 bound: &Bound<R>,
1704) -> Bound<Bytes> {
1705 let serialize_pk_prefix = |pk_prefix: &R| {
1706 let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1707 serialize_pk(pk_prefix, &prefix_serializer)
1708 };
1709 match bound {
1710 Unbounded => Unbounded,
1711 Included(r) => {
1712 let serialized = serialize_pk_prefix(r);
1713
1714 Included(serialized)
1715 }
1716 Excluded(r) => {
1717 let serialized = serialize_pk_prefix(r);
1718
1719 start_bound_of_excluded_prefix(&serialized)
1720 }
1721 }
1722}
1723
1724fn end_range_to_memcomparable<R: Row>(
1725 pk_serde: &OrderedRowSerde,
1726 bound: &Bound<R>,
1727 serialized_pk_prefix: Option<Bytes>,
1728) -> Bound<Bytes> {
1729 let serialize_pk_prefix = |pk_prefix: &R| {
1730 let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1731 serialize_pk(pk_prefix, &prefix_serializer)
1732 };
1733 match bound {
1734 Unbounded => match serialized_pk_prefix {
1735 Some(serialized_pk_prefix) => end_bound_of_prefix(&serialized_pk_prefix),
1736 None => Unbounded,
1737 },
1738 Included(r) => {
1739 let serialized = serialize_pk_prefix(r);
1740
1741 end_bound_of_prefix(&serialized)
1742 }
1743 Excluded(r) => {
1744 let serialized = serialize_pk_prefix(r);
1745 Excluded(serialized)
1746 }
1747 }
1748}
1749
1750fn fill_non_output_indices(
1751 i2o_mapping: &ColIndexMapping,
1752 data_types: &[DataType],
1753 chunk: StreamChunk,
1754) -> StreamChunk {
1755 let cardinality = chunk.cardinality();
1756 let (ops, columns, vis) = chunk.into_inner();
1757 let mut full_columns = Vec::with_capacity(data_types.len());
1758 for (i, data_type) in data_types.iter().enumerate() {
1759 if let Some(j) = i2o_mapping.try_map(i) {
1760 full_columns.push(columns[j].clone());
1761 } else {
1762 let mut column_builder = ArrayImplBuilder::with_type(cardinality, data_type.clone());
1763 column_builder.append_n_null(cardinality);
1764 let column: ArrayRef = column_builder.finish().into();
1765 full_columns.push(column)
1766 }
1767 }
1768 let data_chunk = DataChunk::new(full_columns, vis);
1769 StreamChunk::from_parts(ops, data_chunk)
1770}
1771
1772#[cfg(test)]
1773mod tests {
1774 use std::fmt::Debug;
1775
1776 use expect_test::{Expect, expect};
1777
1778 use super::*;
1779
1780 fn check(actual: impl Debug, expect: Expect) {
1781 let actual = format!("{:#?}", actual);
1782 expect.assert_eq(&actual);
1783 }
1784
1785 #[test]
1786 fn test_fill_non_output_indices() {
1787 let data_types = vec![DataType::Int32, DataType::Int32, DataType::Int32];
1788 let replicated_chunk = [OwnedRow::new(vec![
1789 Some(222_i32.into()),
1790 Some(2_i32.into()),
1791 ])];
1792 let replicated_chunk = StreamChunk::from_parts(
1793 vec![Op::Insert],
1794 DataChunk::from_rows(&replicated_chunk, &[DataType::Int32, DataType::Int32]),
1795 );
1796 let i2o_mapping = ColIndexMapping::new(vec![Some(1), None, Some(0)], 2);
1797 let filled_chunk = fill_non_output_indices(&i2o_mapping, &data_types, replicated_chunk);
1798 check(
1799 filled_chunk,
1800 expect![[r#"
1801 StreamChunk { cardinality: 1, capacity: 1, data:
1802 +---+---+---+-----+
1803 | + | 2 | | 222 |
1804 +---+---+---+-----+
1805 }"#]],
1806 );
1807 }
1808}