1use std::collections::{BTreeMap, HashMap};
16use std::marker::PhantomData;
17use std::ops::Bound;
18use std::ops::Bound::*;
19use std::sync::Arc;
20use std::time::Instant;
21
22use anyhow::anyhow;
23use bytes::Bytes;
24use either::Either;
25use foyer::Hint;
26use futures::future::{ready, try_join_all};
27use futures::stream::BoxStream;
28use futures::{Stream, StreamExt, TryStreamExt, pin_mut};
29use itertools::Itertools;
30use risingwave_common::array::stream_record::Record;
31use risingwave_common::array::{ArrayImplBuilder, ArrayRef, DataChunk, Op, StreamChunk};
32use risingwave_common::bitmap::Bitmap;
33use risingwave_common::catalog::{
34 ColumnDesc, ColumnId, TableId, TableOption, get_dist_key_in_pk_indices,
35};
36use risingwave_common::config::StreamingConfig;
37use risingwave_common::hash::{VirtualNode, VnodeBitmapExt, VnodeCountCompat};
38use risingwave_common::row::{self, OwnedRow, Row, RowExt};
39use risingwave_common::types::{DataType, ScalarImpl};
40use risingwave_common::util::column_index_mapping::ColIndexMapping;
41use risingwave_common::util::epoch::EpochPair;
42use risingwave_common::util::row_serde::OrderedRowSerde;
43use risingwave_common::util::sort_util::OrderType;
44use risingwave_common::util::value_encoding::BasicSerde;
45use risingwave_hummock_sdk::HummockReadEpoch;
46use risingwave_hummock_sdk::key::{
47 CopyFromSlice, TableKey, end_bound_of_prefix, next_key, prefix_slice_with_vnode,
48 prefixed_range_with_vnode, start_bound_of_excluded_prefix,
49};
50use risingwave_hummock_sdk::table_watermark::{
51 VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
52};
53use risingwave_pb::catalog::Table;
54use risingwave_storage::StateStore;
55use risingwave_storage::error::{ErrorKind, StorageError, StorageResult};
56use risingwave_storage::hummock::CachePolicy;
57use risingwave_storage::mem_table::MemTableError;
58use risingwave_storage::row_serde::find_columns_by_ids;
59use risingwave_storage::row_serde::row_serde_util::{
60 deserialize_pk_with_vnode, serialize_pk, serialize_pk_with_vnode, serialize_row,
61};
62use risingwave_storage::row_serde::value_serde::ValueRowSerde;
63use risingwave_storage::store::*;
64use risingwave_storage::table::{KeyedRow, TableDistribution};
65use thiserror_ext::AsReport;
66use tracing::{Instrument, trace};
67
68use crate::cache::keyed_cache_may_stale;
69use crate::executor::monitor::streaming_stats::StateTableMetrics;
70use crate::executor::{StreamExecutorError, StreamExecutorResult};
71
72macro_rules! insane_mode_discard_point {
75 () => {{
76 use rand::Rng;
77 if crate::consistency::insane() && rand::rng().random_bool(0.3) {
78 return;
79 }
80 }};
81}
82
83struct VnodeStatistics {
87 min_key: Option<Bytes>,
88 max_key: Option<Bytes>,
89}
90
91impl VnodeStatistics {
92 fn new() -> Self {
93 Self {
94 min_key: None,
95 max_key: None,
96 }
97 }
98
99 fn update_with_key(&mut self, key: &Bytes) {
100 if let Some(min) = &self.min_key {
101 if key < min {
102 self.min_key = Some(key.clone());
103 }
104 } else {
105 self.min_key = Some(key.clone());
106 }
107
108 if let Some(max) = &self.max_key {
109 if key > max {
110 self.max_key = Some(key.clone());
111 }
112 } else {
113 self.max_key = Some(key.clone());
114 }
115 }
116
117 fn can_prune(&self, key: &Bytes) -> bool {
118 if let Some(min) = &self.min_key
119 && key < min
120 {
121 return true;
122 }
123 if let Some(max) = &self.max_key
124 && key > max
125 {
126 return true;
127 }
128 false
129 }
130
131 fn can_prune_range(&self, start: &Bound<Bytes>, end: &Bound<Bytes>) -> bool {
132 if let Some(max) = &self.max_key {
134 match start {
135 Included(s) if s > max => return true,
136 Excluded(s) if s >= max => return true,
137 _ => {}
138 }
139 }
140 if let Some(min) = &self.min_key {
141 match end {
142 Included(e) if e < min => return true,
143 Excluded(e) if e <= min => return true,
144 _ => {}
145 }
146 }
147 false
148 }
149
150 fn pruned_key_range(
151 &self,
152 start: &Bound<Bytes>,
153 end: &Bound<Bytes>,
154 ) -> Option<(Bound<Bytes>, Bound<Bytes>)> {
155 if self.can_prune_range(start, end) {
156 return None;
157 }
158 let new_start = if let Some(min) = &self.min_key {
159 match start {
160 Included(s) if s <= min => Included(min.clone()),
161 Excluded(s) if s < min => Included(min.clone()),
162 _ => start.clone(),
163 }
164 } else {
165 start.clone()
166 };
167
168 let new_end = if let Some(max) = &self.max_key {
169 match end {
170 Included(e) if e >= max => Included(max.clone()),
171 Excluded(e) if e > max => Included(max.clone()),
172 _ => end.clone(),
173 }
174 } else {
175 end.clone()
176 };
177
178 Some((new_start, new_end))
179 }
180}
181
182pub struct StateTableInner<S, SD = BasicSerde, const IS_REPLICATED: bool = false>
185where
186 S: StateStore,
187 SD: ValueRowSerde,
188{
189 table_id: TableId,
191
192 row_store: StateTableRowStore<S::Local, SD>,
194
195 store: S,
197
198 epoch: Option<EpochPair>,
200
201 pk_serde: OrderedRowSerde,
203
204 pk_indices: Vec<usize>,
208
209 distribution: TableDistribution,
215
216 prefix_hint_len: usize,
217
218 value_indices: Option<Vec<usize>>,
219
220 pub clean_watermark_index: Option<usize>,
222 pending_watermark: Option<ScalarImpl>,
224 committed_watermark: Option<ScalarImpl>,
226 watermark_serde: Option<(OrderedRowSerde, WatermarkSerdeType)>,
228
229 data_types: Vec<DataType>,
232
233 i2o_mapping: ColIndexMapping,
239
240 pub output_indices: Vec<usize>,
245
246 op_consistency_level: StateTableOpConsistencyLevel,
247
248 on_post_commit: bool,
251}
252
253pub type StateTable<S> = StateTableInner<S, BasicSerde>;
255pub type ReplicatedStateTable<S, SD> = StateTableInner<S, SD, true>;
258
259impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
261where
262 S: StateStore,
263 SD: ValueRowSerde,
264{
265 pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
268 self.row_store
269 .init(epoch, self.distribution.vnodes())
270 .await?;
271 assert_eq!(None, self.epoch.replace(epoch), "should not init for twice");
272 Ok(())
273 }
274
275 pub async fn try_wait_committed_epoch(&self, prev_epoch: u64) -> StorageResult<()> {
276 self.store
277 .try_wait_epoch(
278 HummockReadEpoch::Committed(prev_epoch),
279 TryWaitEpochOptions {
280 table_id: self.table_id,
281 },
282 )
283 .await
284 }
285
286 pub fn state_store(&self) -> &S {
287 &self.store
288 }
289}
290
291fn consistent_old_value_op(
292 row_serde: Arc<impl ValueRowSerde>,
293 is_log_store: bool,
294) -> OpConsistencyLevel {
295 OpConsistencyLevel::ConsistentOldValue {
296 check_old_value: Arc::new(move |first: &Bytes, second: &Bytes| {
297 if first == second {
298 return true;
299 }
300 let first = match row_serde.deserialize(first) {
301 Ok(rows) => rows,
302 Err(e) => {
303 error!(error = %e.as_report(), value = ?first, "fail to deserialize serialized value");
304 return false;
305 }
306 };
307 let second = match row_serde.deserialize(second) {
308 Ok(rows) => rows,
309 Err(e) => {
310 error!(error = %e.as_report(), value = ?second, "fail to deserialize serialized value");
311 return false;
312 }
313 };
314 if first != second {
315 error!(first = ?first, second = ?second, "sanity check fail");
316 false
317 } else {
318 true
319 }
320 }),
321 is_log_store,
322 }
323}
324
325macro_rules! dispatch_value_indices {
326 ($value_indices:expr, [$($row_var_name:ident),+], $body:expr) => {
327 if let Some(value_indices) = $value_indices {
328 $(
329 let $row_var_name = $row_var_name.project(value_indices);
330 )+
331 $body
332 } else {
333 $body
334 }
335 };
336}
337
338struct StateTableRowStore<LS: LocalStateStore, SD: ValueRowSerde> {
343 state_store: LS,
344 all_rows: Option<HashMap<VirtualNode, BTreeMap<Bytes, OwnedRow>>>,
345
346 table_id: TableId,
347 row_serde: Arc<SD>,
348 pk_serde: OrderedRowSerde,
350
351 vnode_stats: Option<HashMap<VirtualNode, VnodeStatistics>>,
353 pub metrics: Option<StateTableMetrics>,
355}
356
357impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
358 async fn may_load_vnode_stats(&mut self, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
359 if self.vnode_stats.is_none() {
360 return Ok(());
361 }
362
363 assert!(self.all_rows.is_none());
365
366 let start_time = Instant::now();
367 let mut stats_map = HashMap::new();
368
369 for vnode in vnode_bitmap.iter_vnodes() {
371 let mut stats = VnodeStatistics::new();
372
373 let memcomparable_range_with_vnode = prefixed_range_with_vnode::<Bytes>(.., vnode);
375 let read_options = ReadOptions {
376 cache_policy: CachePolicy::Fill(Hint::Low),
377 ..Default::default()
378 };
379
380 let mut iter = self
381 .state_store
382 .iter(memcomparable_range_with_vnode.clone(), read_options.clone())
383 .await?;
384 if let Some(item) = iter.try_next().await? {
385 let (key_vnode, key_without_vnode) = item.0.user_key.table_key.split_vnode();
386 assert_eq!(vnode, key_vnode);
387 stats.min_key = Some(Bytes::copy_from_slice(key_without_vnode));
388 }
389
390 let mut rev_iter = self
392 .state_store
393 .rev_iter(memcomparable_range_with_vnode, read_options)
394 .await?;
395 if let Some(item) = rev_iter.try_next().await? {
396 let (key_vnode, key_without_vnode) = item.0.user_key.table_key.split_vnode();
397 assert_eq!(vnode, key_vnode);
398 stats.max_key = Some(Bytes::copy_from_slice(key_without_vnode));
399 }
400
401 stats_map.insert(vnode, stats);
402 }
403
404 self.vnode_stats = Some(stats_map);
405
406 if !cfg!(debug_assertions) {
408 info!(
409 table_id = %self.table_id,
410 vnode_count = vnode_bitmap.count_ones(),
411 duration = ?start_time.elapsed(),
412 "finished initializing vnode statistics"
413 );
414 }
415
416 Ok(())
417 }
418
419 async fn may_reload_all_rows(&mut self, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
420 if let Some(rows) = &mut self.all_rows {
421 rows.clear();
422 let start_time = Instant::now();
423 *rows = try_join_all(vnode_bitmap.iter_vnodes().map(|vnode| {
424 let state_store = &self.state_store;
425 let row_serde = &self.row_serde;
426 async move {
427 let mut rows = BTreeMap::new();
428 let memcomparable_range_with_vnode =
429 prefixed_range_with_vnode::<Bytes>(.., vnode);
430 let stream = deserialize_keyed_row_stream::<Bytes>(
432 state_store
433 .iter(
434 memcomparable_range_with_vnode,
435 ReadOptions {
436 prefix_hint: None,
437 prefetch_options: Default::default(),
438 cache_policy: Default::default(),
439 },
440 )
441 .await?,
442 &**row_serde,
443 );
444 pin_mut!(stream);
445 while let Some((encoded_key, row)) = stream.try_next().await? {
446 let key = TableKey(encoded_key);
447 let (iter_vnode, key) = key.split_vnode_bytes();
448 assert_eq!(vnode, iter_vnode);
449 rows.try_insert(key, row).expect("non-duplicated");
450 }
451 Ok((vnode, rows)) as StreamExecutorResult<_>
452 }
453 }))
454 .await?
455 .into_iter()
456 .collect();
457 if !cfg!(debug_assertions) {
459 info!(table_id = %self.table_id, vnode_count = vnode_bitmap.count_ones(), duration = ?start_time.elapsed(),"finished reloading all rows");
460 }
461 }
462 Ok(())
463 }
464
465 async fn init(&mut self, epoch: EpochPair, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
466 self.state_store.init(InitOptions::new(epoch)).await?;
467 self.may_reload_all_rows(vnode_bitmap).await?;
468 self.may_load_vnode_stats(vnode_bitmap).await
469 }
470
471 async fn update_vnode_bitmap(
472 &mut self,
473 vnodes: Arc<Bitmap>,
474 ) -> StreamExecutorResult<Arc<Bitmap>> {
475 let prev_vnodes = self.state_store.update_vnode_bitmap(vnodes.clone()).await?;
476 self.may_reload_all_rows(&vnodes).await?;
477 self.may_load_vnode_stats(&vnodes).await?;
478
479 Ok(prev_vnodes)
480 }
481
482 async fn try_flush(&mut self) -> StreamExecutorResult<()> {
483 self.state_store.try_flush().await?;
484 Ok(())
485 }
486
487 async fn seal_current_epoch(
488 &mut self,
489 next_epoch: u64,
490 table_watermarks: Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)>,
491 switch_consistent_op: Option<StateTableOpConsistencyLevel>,
492 ) -> StreamExecutorResult<()> {
493 if let Some((direction, watermarks, serde_type)) = &table_watermarks
494 && let Some(rows) = &mut self.all_rows
495 {
496 match serde_type {
497 WatermarkSerdeType::PkPrefix => {
498 for vnode_watermark in watermarks {
499 match direction {
500 WatermarkDirection::Ascending => {
501 for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
502 let rows = rows.get_mut(&vnode).expect("covered vnode");
503 *rows = rows.split_off(vnode_watermark.watermark());
505 }
506 }
507 WatermarkDirection::Descending => {
508 let split_off_key = next_key(vnode_watermark.watermark());
510 for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
511 let rows = rows.get_mut(&vnode).expect("covered vnode");
512 rows.split_off(split_off_key.as_slice());
515 }
516 }
517 }
518 }
519 }
520 WatermarkSerdeType::NonPkPrefix => {
521 warn!(table_id = %self.table_id, "table enabled preloading rows got disabled by written non pk prefix watermark");
522 self.all_rows = None;
523 }
524 WatermarkSerdeType::Value => {
525 warn!(table_id = %self.table_id, "table enabled preloading rows got disabled by written value watermark");
526 self.all_rows = None;
527 }
528 }
529 }
530 self.state_store
531 .flush()
532 .instrument(tracing::info_span!("state_table_flush"))
533 .await?;
534 let switch_op_consistency_level =
535 switch_consistent_op.map(|new_consistency_level| match new_consistency_level {
536 StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
537 StateTableOpConsistencyLevel::ConsistentOldValue => {
538 consistent_old_value_op(self.row_serde.clone(), false)
539 }
540 StateTableOpConsistencyLevel::LogStoreEnabled => {
541 consistent_old_value_op(self.row_serde.clone(), true)
542 }
543 });
544 self.state_store.seal_current_epoch(
545 next_epoch,
546 SealCurrentEpochOptions {
547 table_watermarks,
548 switch_op_consistency_level,
549 },
550 );
551 Ok(())
552 }
553}
554
555#[derive(Eq, PartialEq, Copy, Clone, Debug)]
556pub enum StateTableOpConsistencyLevel {
557 Inconsistent,
559 ConsistentOldValue,
563 LogStoreEnabled,
566}
567
568pub struct StateTableBuilder<'a, S, SD, const IS_REPLICATED: bool, PreloadAllRow> {
569 table_catalog: &'a Table,
570 store: S,
571 vnodes: Option<Arc<Bitmap>>,
572 op_consistency_level: Option<StateTableOpConsistencyLevel>,
573 output_column_ids: Option<Vec<ColumnId>>,
574 preload_all_rows: PreloadAllRow,
575 enable_vnode_key_pruning: Option<bool>,
576 metrics: Option<StateTableMetrics>,
577
578 _serde: PhantomData<SD>,
579}
580
581impl<'a, S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool>
582 StateTableBuilder<'a, S, SD, IS_REPLICATED, ()>
583{
584 pub fn new(table_catalog: &'a Table, store: S, vnodes: Option<Arc<Bitmap>>) -> Self {
585 Self {
586 table_catalog,
587 store,
588 vnodes,
589 op_consistency_level: None,
590 output_column_ids: None,
591 preload_all_rows: (),
592 enable_vnode_key_pruning: None,
593 metrics: None,
594 _serde: Default::default(),
595 }
596 }
597
598 fn with_preload_all_rows(
599 self,
600 preload_all_rows: bool,
601 ) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, bool> {
602 StateTableBuilder {
603 table_catalog: self.table_catalog,
604 store: self.store,
605 vnodes: self.vnodes,
606 op_consistency_level: self.op_consistency_level,
607 output_column_ids: self.output_column_ids,
608 preload_all_rows,
609 enable_vnode_key_pruning: self.enable_vnode_key_pruning,
610 metrics: self.metrics,
611 _serde: Default::default(),
612 }
613 }
614
615 pub fn enable_preload_all_rows_by_config(
616 self,
617 config: &StreamingConfig,
618 ) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, bool> {
619 let developer = &config.developer;
620 let preload_all_rows = if developer.default_enable_mem_preload_state_table {
621 !developer
622 .mem_preload_state_table_ids_blacklist
623 .contains(&self.table_catalog.id.as_raw_id())
624 } else {
625 developer
626 .mem_preload_state_table_ids_whitelist
627 .contains(&self.table_catalog.id.as_raw_id())
628 };
629 self.with_preload_all_rows(preload_all_rows)
630 }
631
632 pub fn forbid_preload_all_rows(self) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, bool> {
633 self.with_preload_all_rows(false)
634 }
635}
636
637impl<'a, S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool, PreloadAllRow>
638 StateTableBuilder<'a, S, SD, IS_REPLICATED, PreloadAllRow>
639{
640 pub fn with_op_consistency_level(
641 mut self,
642 op_consistency_level: StateTableOpConsistencyLevel,
643 ) -> Self {
644 self.op_consistency_level = Some(op_consistency_level);
645 self
646 }
647
648 pub fn with_output_column_ids(mut self, output_column_ids: Vec<ColumnId>) -> Self {
649 self.output_column_ids = Some(output_column_ids);
650 self
651 }
652
653 pub fn enable_vnode_key_pruning(mut self, enable: bool) -> Self {
654 self.enable_vnode_key_pruning = Some(enable);
655 self
656 }
657
658 pub fn with_metrics(mut self, metrics: StateTableMetrics) -> Self {
659 self.metrics = Some(metrics);
660 self
661 }
662}
663
664impl<'a, S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool>
665 StateTableBuilder<'a, S, SD, IS_REPLICATED, bool>
666{
667 pub async fn build(self) -> StateTableInner<S, SD, IS_REPLICATED> {
668 let mut preload_all_rows = self.preload_all_rows;
669 if preload_all_rows
670 && let Err(e) =
671 risingwave_common::license::Feature::StateTableMemoryPreload.check_available()
672 {
673 warn!(table_id=%self.table_catalog.id, e=%e.as_report(), "table configured to preload rows to memory but disabled by license");
674 preload_all_rows = false;
675 }
676
677 let should_enable_vnode_key_pruning = if preload_all_rows
678 && let Some(enable_vnode_key_pruning) = self.enable_vnode_key_pruning
679 && enable_vnode_key_pruning
680 {
681 false
682 } else {
683 self.enable_vnode_key_pruning.unwrap_or(false)
684 };
685
686 StateTableInner::from_table_catalog_inner(
687 self.table_catalog,
688 self.store,
689 self.vnodes,
690 self.op_consistency_level
691 .unwrap_or(StateTableOpConsistencyLevel::ConsistentOldValue),
692 self.output_column_ids.unwrap_or_default(),
693 preload_all_rows,
694 should_enable_vnode_key_pruning,
695 self.metrics,
696 )
697 .await
698 }
699}
700
701impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
706where
707 S: StateStore,
708 SD: ValueRowSerde,
709{
710 #[cfg(any(test, feature = "test"))]
714 pub async fn from_table_catalog(
715 table_catalog: &Table,
716 store: S,
717 vnodes: Option<Arc<Bitmap>>,
718 ) -> Self {
719 StateTableBuilder::new(table_catalog, store, vnodes)
720 .forbid_preload_all_rows()
721 .build()
722 .await
723 }
724
725 pub async fn from_table_catalog_inconsistent_op(
727 table_catalog: &Table,
728 store: S,
729 vnodes: Option<Arc<Bitmap>>,
730 ) -> Self {
731 StateTableBuilder::new(table_catalog, store, vnodes)
732 .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
733 .forbid_preload_all_rows()
734 .build()
735 .await
736 }
737
738 #[allow(clippy::too_many_arguments)]
740 async fn from_table_catalog_inner(
741 table_catalog: &Table,
742 store: S,
743 vnodes: Option<Arc<Bitmap>>,
744 op_consistency_level: StateTableOpConsistencyLevel,
745 output_column_ids: Vec<ColumnId>,
746 preload_all_rows: bool,
747 enable_vnode_key_pruning: bool,
748 metrics: Option<StateTableMetrics>,
749 ) -> Self {
750 let table_id = table_catalog.id;
751 let table_columns: Vec<ColumnDesc> = table_catalog
752 .columns
753 .iter()
754 .map(|col| col.column_desc.as_ref().unwrap().into())
755 .collect();
756 let data_types: Vec<DataType> = table_catalog
757 .columns
758 .iter()
759 .map(|col| {
760 col.get_column_desc()
761 .unwrap()
762 .get_column_type()
763 .unwrap()
764 .into()
765 })
766 .collect();
767 let order_types: Vec<OrderType> = table_catalog
768 .pk
769 .iter()
770 .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
771 .collect();
772 let dist_key_indices: Vec<usize> = table_catalog
773 .distribution_key
774 .iter()
775 .map(|dist_index| *dist_index as usize)
776 .collect();
777
778 let pk_indices = table_catalog
779 .pk
780 .iter()
781 .map(|col_order| col_order.column_index as usize)
782 .collect_vec();
783
784 let dist_key_in_pk_indices = if table_catalog.get_dist_key_in_pk().is_empty() {
786 get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices).unwrap()
787 } else {
788 table_catalog
789 .get_dist_key_in_pk()
790 .iter()
791 .map(|idx| *idx as usize)
792 .collect()
793 };
794
795 let vnode_col_idx_in_pk = table_catalog.vnode_col_index.as_ref().and_then(|idx| {
796 let vnode_col_idx = *idx as usize;
797 pk_indices.iter().position(|&i| vnode_col_idx == i)
798 });
799
800 let distribution =
801 TableDistribution::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk);
802 assert_eq!(
803 distribution.vnode_count(),
804 table_catalog.vnode_count(),
805 "vnode count mismatch, scanning table {} under wrong distribution?",
806 table_catalog.name,
807 );
808
809 let pk_data_types = pk_indices
810 .iter()
811 .map(|i| table_columns[*i].data_type.clone())
812 .collect();
813 let pk_serde = OrderedRowSerde::new(pk_data_types, order_types);
814
815 let input_value_indices = table_catalog
816 .value_indices
817 .iter()
818 .map(|val| *val as usize)
819 .collect_vec();
820
821 let no_shuffle_value_indices = (0..table_columns.len()).collect_vec();
822
823 let value_indices = match input_value_indices.len() == table_columns.len()
825 && input_value_indices == no_shuffle_value_indices
826 {
827 true => None,
828 false => Some(input_value_indices),
829 };
830 let prefix_hint_len = table_catalog.read_prefix_len_hint as usize;
831
832 let row_serde = Arc::new(SD::new(
833 Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
834 Arc::from(table_columns.clone().into_boxed_slice()),
835 ));
836
837 let state_table_op_consistency_level = op_consistency_level;
838 let op_consistency_level = match op_consistency_level {
839 StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
840 StateTableOpConsistencyLevel::ConsistentOldValue => {
841 consistent_old_value_op(row_serde.clone(), false)
842 }
843 StateTableOpConsistencyLevel::LogStoreEnabled => {
844 consistent_old_value_op(row_serde.clone(), true)
845 }
846 };
847
848 let table_option = TableOption::new(table_catalog.retention_seconds);
849 let new_local_options = if IS_REPLICATED {
850 NewLocalOptions::new_replicated(
851 table_id,
852 op_consistency_level,
853 table_option,
854 distribution.vnodes().clone(),
855 )
856 } else {
857 NewLocalOptions::new(
858 table_id,
859 op_consistency_level,
860 table_option,
861 distribution.vnodes().clone(),
862 true,
863 )
864 };
865 let local_state_store = store.new_local(new_local_options).await;
866
867 assert_eq!(
873 table_catalog.version.is_some(),
874 row_serde.kind().is_column_aware()
875 );
876
877 let output_column_ids_to_input_idx = output_column_ids
879 .iter()
880 .enumerate()
881 .map(|(pos, id)| (*id, pos))
882 .collect::<HashMap<_, _>>();
883
884 let columns: Vec<ColumnDesc> = table_catalog
886 .columns
887 .iter()
888 .map(|c| c.column_desc.as_ref().unwrap().into())
889 .collect_vec();
890
891 let mut i2o_mapping = vec![None; columns.len()];
895 for (i, column) in columns.iter().enumerate() {
896 if let Some(pos) = output_column_ids_to_input_idx.get(&column.column_id) {
897 i2o_mapping[i] = Some(*pos);
898 }
899 }
900 let i2o_mapping = ColIndexMapping::new(i2o_mapping, output_column_ids.len());
902
903 let (_, output_indices) = find_columns_by_ids(&columns[..], &output_column_ids);
905 let clean_watermark_indices = table_catalog.get_clean_watermark_column_indices();
906 if clean_watermark_indices.len() > 1 {
907 unimplemented!("multiple clean watermark columns are not supported yet")
908 }
909 let clean_watermark_index = clean_watermark_indices.first().map(|&i| i as usize);
910 let watermark_serde = clean_watermark_index.map(|idx| {
911 let pk_idx = pk_indices.iter().position(|&i| i == idx);
912 let (watermark_serde, watermark_serde_type) = match pk_idx {
913 Some(0) => (pk_serde.index(0).into_owned(), WatermarkSerdeType::PkPrefix),
914 Some(pk_idx) => (
915 pk_serde.index(pk_idx).into_owned(),
916 WatermarkSerdeType::NonPkPrefix,
917 ),
918 None => (
919 OrderedRowSerde::new(
920 vec![data_types[idx].clone()],
921 vec![OrderType::ascending()],
922 ),
923 WatermarkSerdeType::Value,
924 ),
925 };
926 (watermark_serde, watermark_serde_type)
927 });
928
929 let max_watermark_of_vnodes = distribution
931 .vnodes()
932 .iter_vnodes()
933 .filter_map(|vnode| local_state_store.get_table_watermark(vnode))
934 .max();
935 let committed_watermark = if let Some((deser, _)) = watermark_serde.as_ref()
936 && let Some(max_watermark) = max_watermark_of_vnodes
937 {
938 let deserialized = deser.deserialize(&max_watermark).ok().and_then(|row| {
939 assert!(row.len() == 1);
940 row[0].clone()
941 });
942 if deserialized.is_none() {
943 tracing::error!(
944 vnodes = ?distribution.vnodes(),
945 watermark = ?max_watermark,
946 "Failed to deserialize persisted watermark from state store.",
947 );
948 }
949 deserialized
950 } else {
951 None
952 };
953
954 Self {
955 table_id,
956 row_store: StateTableRowStore {
957 all_rows: preload_all_rows.then(HashMap::new),
958 state_store: local_state_store,
959 row_serde,
960 pk_serde: pk_serde.clone(),
961 table_id,
962 vnode_stats: enable_vnode_key_pruning.then(HashMap::new),
964 metrics,
965 },
966 store,
967 epoch: None,
968 pk_serde,
969 pk_indices,
970 distribution,
971 prefix_hint_len,
972 value_indices,
973 pending_watermark: None,
974 committed_watermark,
975 watermark_serde,
976 data_types,
977 output_indices,
978 i2o_mapping,
979 op_consistency_level: state_table_op_consistency_level,
980 clean_watermark_index,
981 on_post_commit: false,
982 }
983 }
984
985 pub fn get_data_types(&self) -> &[DataType] {
986 &self.data_types
987 }
988
989 pub fn table_id(&self) -> TableId {
990 self.table_id
991 }
992
993 fn compute_prefix_vnode(&self, pk_prefix: &impl Row) -> VirtualNode {
995 self.distribution
996 .try_compute_vnode_by_pk_prefix(pk_prefix)
997 .expect("For streaming, the given prefix must be enough to calculate the vnode")
998 }
999
1000 pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode {
1002 self.distribution.compute_vnode_by_pk(pk)
1003 }
1004
1005 pub fn pk_indices(&self) -> &[usize] {
1008 &self.pk_indices
1009 }
1010
1011 pub fn pk_in_output_indices(&self) -> Option<Vec<usize>> {
1015 assert!(IS_REPLICATED);
1016 self.pk_indices
1017 .iter()
1018 .map(|&i| self.output_indices.iter().position(|&j| i == j))
1019 .collect()
1020 }
1021
1022 pub fn pk_serde(&self) -> &OrderedRowSerde {
1023 &self.pk_serde
1024 }
1025
1026 pub fn vnodes(&self) -> &Arc<Bitmap> {
1027 self.distribution.vnodes()
1028 }
1029
1030 pub fn value_indices(&self) -> &Option<Vec<usize>> {
1031 &self.value_indices
1032 }
1033
1034 pub fn is_consistent_op(&self) -> bool {
1035 matches!(
1036 self.op_consistency_level,
1037 StateTableOpConsistencyLevel::ConsistentOldValue
1038 | StateTableOpConsistencyLevel::LogStoreEnabled
1039 )
1040 }
1041
1042 pub fn metrics(&self) -> Option<&StateTableMetrics> {
1043 self.row_store.metrics.as_ref()
1044 }
1045}
1046
1047impl<S, SD> StateTableInner<S, SD, true>
1048where
1049 S: StateStore,
1050 SD: ValueRowSerde,
1051{
1052 pub async fn new_replicated(
1054 table_catalog: &Table,
1055 store: S,
1056 vnodes: Option<Arc<Bitmap>>,
1057 output_column_ids: Vec<ColumnId>,
1058 ) -> Self {
1059 StateTableBuilder::new(table_catalog, store, vnodes)
1062 .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
1063 .with_output_column_ids(output_column_ids)
1064 .forbid_preload_all_rows()
1065 .build()
1066 .await
1067 }
1068}
1069
1070impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1072where
1073 S: StateStore,
1074 SD: ValueRowSerde,
1075{
1076 pub async fn get_row(&self, pk: impl Row) -> StreamExecutorResult<Option<OwnedRow>> {
1078 let (serialized_pk, prefix_hint) = self.serialize_pk_and_get_prefix_hint(&pk);
1079 let row = self.row_store.get(serialized_pk, prefix_hint).await?;
1080 match row {
1081 Some(row) => {
1082 if IS_REPLICATED {
1083 let row = row.project(&self.output_indices);
1086 Ok(Some(row.into_owned_row()))
1087 } else {
1088 Ok(Some(row))
1089 }
1090 }
1091 None => Ok(None),
1092 }
1093 }
1094
1095 pub async fn exists(&self, pk: impl Row) -> StreamExecutorResult<bool> {
1097 let (serialized_pk, prefix_hint) = self.serialize_pk_and_get_prefix_hint(&pk);
1098 self.row_store.exists(serialized_pk, prefix_hint).await
1099 }
1100
1101 fn serialize_pk(&self, pk: &impl Row) -> TableKey<Bytes> {
1102 assert!(pk.len() <= self.pk_indices.len());
1103 serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_vnode_by_pk(pk))
1104 }
1105
1106 fn serialize_pk_and_get_prefix_hint(&self, pk: &impl Row) -> (TableKey<Bytes>, Option<Bytes>) {
1107 let serialized_pk = self.serialize_pk(&pk);
1108 let prefix_hint = if self.prefix_hint_len != 0 && self.prefix_hint_len == pk.len() {
1109 Some(serialized_pk.slice(VirtualNode::SIZE..))
1110 } else {
1111 #[cfg(debug_assertions)]
1112 if self.prefix_hint_len != 0 {
1113 warn!(
1114 "prefix_hint_len is not equal to pk.len(), may not be able to utilize bloom filter"
1115 );
1116 }
1117 None
1118 };
1119 (serialized_pk, prefix_hint)
1120 }
1121}
1122
1123impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1124 async fn get(
1125 &self,
1126 key_bytes: TableKey<Bytes>,
1127 prefix_hint: Option<Bytes>,
1128 ) -> StreamExecutorResult<Option<OwnedRow>> {
1129 if let Some(m) = &self.metrics {
1130 m.get_count.inc();
1131 }
1132 if let Some(rows) = &self.all_rows {
1133 let (vnode, key) = key_bytes.split_vnode_bytes();
1134 return Ok(rows.get(&vnode).expect("covered vnode").get(&key).cloned());
1135 }
1136
1137 if let Some(stats) = &self.vnode_stats
1139 && let (vnode, key) = key_bytes.split_vnode_bytes()
1140 && let Some(vnode_stat) = stats.get(&vnode)
1141 && vnode_stat.can_prune(&key)
1142 {
1143 if let Some(m) = &self.metrics {
1144 m.get_vnode_pruned_count.inc();
1145 }
1146 return Ok(None);
1147 }
1148
1149 let read_options = ReadOptions {
1150 prefix_hint,
1151 cache_policy: CachePolicy::Fill(Hint::Normal),
1152 ..Default::default()
1153 };
1154
1155 self.state_store
1156 .on_key_value(key_bytes, read_options, move |_, value| {
1157 let row = self.row_serde.deserialize(value)?;
1158 Ok(OwnedRow::new(row))
1159 })
1160 .await
1161 .map_err(Into::into)
1162 }
1163
1164 async fn exists(
1165 &self,
1166 key_bytes: TableKey<Bytes>,
1167 prefix_hint: Option<Bytes>,
1168 ) -> StreamExecutorResult<bool> {
1169 if let Some(m) = &self.metrics {
1170 m.get_count.inc();
1171 }
1172 if let Some(rows) = &self.all_rows {
1173 let (vnode, key) = key_bytes.split_vnode_bytes();
1174 return Ok(rows.get(&vnode).expect("covered vnode").contains_key(&key));
1175 }
1176
1177 if let Some(stats) = &self.vnode_stats
1179 && let (vnode, key) = key_bytes.split_vnode_bytes()
1180 && let Some(vnode_stat) = stats.get(&vnode)
1181 && vnode_stat.can_prune(&key)
1182 {
1183 if let Some(m) = &self.metrics {
1184 m.get_vnode_pruned_count.inc();
1185 }
1186 return Ok(false);
1187 }
1188
1189 let read_options = ReadOptions {
1190 prefix_hint,
1191 cache_policy: CachePolicy::Fill(Hint::Normal),
1192 ..Default::default()
1193 };
1194 let result = self
1195 .state_store
1196 .on_key_value(key_bytes, read_options, move |_, _| Ok(()))
1197 .await?;
1198 Ok(result.is_some())
1199 }
1200}
1201
1202#[must_use]
1217pub struct StateTablePostCommit<'a, S, SD = BasicSerde, const IS_REPLICATED: bool = false>
1218where
1219 S: StateStore,
1220 SD: ValueRowSerde,
1221{
1222 inner: &'a mut StateTableInner<S, SD, IS_REPLICATED>,
1223}
1224
1225impl<'a, S, SD, const IS_REPLICATED: bool> StateTablePostCommit<'a, S, SD, IS_REPLICATED>
1226where
1227 S: StateStore,
1228 SD: ValueRowSerde,
1229{
1230 pub async fn post_yield_barrier(
1235 mut self,
1236 new_vnodes: Option<Arc<Bitmap>>,
1237 ) -> StreamExecutorResult<
1238 Option<(
1239 (
1240 Arc<Bitmap>,
1241 Arc<Bitmap>,
1242 &'a mut StateTableInner<S, SD, IS_REPLICATED>,
1243 ),
1244 bool,
1245 )>,
1246 > {
1247 self.inner.on_post_commit = false;
1248 Ok(if let Some(new_vnodes) = new_vnodes {
1249 let (old_vnodes, keyed_cache_may_stale) =
1250 self.update_vnode_bitmap(new_vnodes.clone()).await?;
1251 Some(((new_vnodes, old_vnodes, self.inner), keyed_cache_may_stale))
1252 } else {
1253 None
1254 })
1255 }
1256
1257 pub fn inner(&self) -> &StateTableInner<S, SD, IS_REPLICATED> {
1258 &*self.inner
1259 }
1260
1261 async fn update_vnode_bitmap(
1263 &mut self,
1264 new_vnodes: Arc<Bitmap>,
1265 ) -> StreamExecutorResult<(Arc<Bitmap>, bool)> {
1266 let prev_vnodes = self
1267 .inner
1268 .row_store
1269 .update_vnode_bitmap(new_vnodes.clone())
1270 .await?;
1271 assert_eq!(
1272 &prev_vnodes,
1273 self.inner.vnodes(),
1274 "state table and state store vnode bitmap mismatches"
1275 );
1276
1277 if self.inner.distribution.is_singleton() {
1278 assert_eq!(
1279 &new_vnodes,
1280 self.inner.vnodes(),
1281 "should not update vnode bitmap for singleton table"
1282 );
1283 }
1284 assert_eq!(self.inner.vnodes().len(), new_vnodes.len());
1285
1286 let keyed_cache_may_stale = keyed_cache_may_stale(self.inner.vnodes(), &new_vnodes);
1287
1288 if keyed_cache_may_stale {
1289 self.inner.pending_watermark = None;
1290 }
1291
1292 Ok((
1293 self.inner.distribution.update_vnode_bitmap(new_vnodes),
1294 keyed_cache_may_stale,
1295 ))
1296 }
1297}
1298
1299impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1301 fn handle_mem_table_error(&self, e: StorageError) {
1302 let e = match e.into_inner() {
1303 ErrorKind::MemTable(e) => e,
1304 _ => unreachable!("should only get memtable error"),
1305 };
1306 match *e {
1307 MemTableError::InconsistentOperation { key, prev, new, .. } => {
1308 let (vnode, key) = deserialize_pk_with_vnode(&key, &self.pk_serde).unwrap();
1309 panic!(
1310 "mem-table operation inconsistent! table_id: {}, vnode: {}, key: {:?}, prev: {}, new: {}",
1311 self.table_id,
1312 vnode,
1313 &key,
1314 prev.debug_fmt(&*self.row_serde),
1315 new.debug_fmt(&*self.row_serde),
1316 )
1317 }
1318 }
1319 }
1320
1321 fn insert(&mut self, key: TableKey<Bytes>, value: impl Row) {
1322 insane_mode_discard_point!();
1323 let value_bytes = self.row_serde.serialize(&value).into();
1324
1325 let (vnode, key_without_vnode) = key.split_vnode_bytes();
1326
1327 if self.all_rows.is_none()
1329 && let Some(stats) = &mut self.vnode_stats
1330 && let Some(vnode_stat) = stats.get_mut(&vnode)
1331 {
1332 vnode_stat.update_with_key(&key_without_vnode);
1333 }
1334
1335 if let Some(rows) = &mut self.all_rows {
1336 rows.get_mut(&vnode)
1337 .expect("covered vnode")
1338 .insert(key_without_vnode, value.into_owned_row());
1339 }
1340 self.state_store
1341 .insert(key, value_bytes, None)
1342 .unwrap_or_else(|e| self.handle_mem_table_error(e));
1343 }
1344
1345 fn delete(&mut self, key: TableKey<Bytes>, value: impl Row) {
1346 insane_mode_discard_point!();
1347 let value_bytes = self.row_serde.serialize(value).into();
1348
1349 let (vnode, key_without_vnode) = key.split_vnode_bytes();
1350
1351 if self.all_rows.is_none()
1354 && let Some(stats) = &mut self.vnode_stats
1355 && let Some(vnode_stat) = stats.get_mut(&vnode)
1356 {
1357 vnode_stat.update_with_key(&key_without_vnode);
1358 }
1359
1360 if let Some(rows) = &mut self.all_rows {
1361 rows.get_mut(&vnode)
1362 .expect("covered vnode")
1363 .remove(&key_without_vnode);
1364 }
1365 self.state_store
1366 .delete(key, value_bytes)
1367 .unwrap_or_else(|e| self.handle_mem_table_error(e));
1368 }
1369
1370 fn update(&mut self, key_bytes: TableKey<Bytes>, old_value: impl Row, new_value: impl Row) {
1371 insane_mode_discard_point!();
1372 let new_value_bytes = self.row_serde.serialize(&new_value).into();
1373 let old_value_bytes = self.row_serde.serialize(old_value).into();
1374
1375 let (vnode, key_without_vnode) = key_bytes.split_vnode_bytes();
1376
1377 if self.all_rows.is_none()
1380 && let Some(stats) = &mut self.vnode_stats
1381 && let Some(vnode_stat) = stats.get_mut(&vnode)
1382 {
1383 vnode_stat.update_with_key(&key_without_vnode);
1384 }
1385
1386 if let Some(rows) = &mut self.all_rows {
1387 rows.get_mut(&vnode)
1388 .expect("covered vnode")
1389 .insert(key_without_vnode, new_value.into_owned_row());
1390 }
1391 self.state_store
1392 .insert(key_bytes, new_value_bytes, Some(old_value_bytes))
1393 .unwrap_or_else(|e| self.handle_mem_table_error(e));
1394 }
1395}
1396
1397impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1398where
1399 S: StateStore,
1400 SD: ValueRowSerde,
1401{
1402 pub fn insert(&mut self, value: impl Row) {
1405 let pk_indices = &self.pk_indices;
1406 let pk = (&value).project(pk_indices);
1407
1408 let key_bytes = self.serialize_pk(&pk);
1409 dispatch_value_indices!(&self.value_indices, [value], {
1410 self.row_store.insert(key_bytes, value)
1411 })
1412 }
1413
1414 pub fn delete(&mut self, old_value: impl Row) {
1417 let pk_indices = &self.pk_indices;
1418 let pk = (&old_value).project(pk_indices);
1419
1420 let key_bytes = self.serialize_pk(&pk);
1421 dispatch_value_indices!(&self.value_indices, [old_value], {
1422 self.row_store.delete(key_bytes, old_value)
1423 })
1424 }
1425
1426 pub fn update(&mut self, old_value: impl Row, new_value: impl Row) {
1428 let old_pk = (&old_value).project(self.pk_indices());
1429 let new_pk = (&new_value).project(self.pk_indices());
1430 debug_assert!(
1431 Row::eq(&old_pk, new_pk),
1432 "pk should not change: {old_pk:?} vs {new_pk:?}. {}",
1433 self.table_id
1434 );
1435
1436 let key_bytes = self.serialize_pk(&new_pk);
1437 dispatch_value_indices!(&self.value_indices, [old_value, new_value], {
1438 self.row_store.update(key_bytes, old_value, new_value)
1439 })
1440 }
1441
1442 pub fn write_record(&mut self, record: Record<impl Row>) {
1444 match record {
1445 Record::Insert { new_row } => self.insert(new_row),
1446 Record::Delete { old_row } => self.delete(old_row),
1447 Record::Update { old_row, new_row } => self.update(old_row, new_row),
1448 }
1449 }
1450
1451 fn fill_non_output_indices(&self, chunk: StreamChunk) -> StreamChunk {
1452 fill_non_output_indices(&self.i2o_mapping, &self.data_types, chunk)
1453 }
1454
1455 #[allow(clippy::disallowed_methods)]
1458 pub fn write_chunk(&mut self, chunk: StreamChunk) {
1459 let chunk = if IS_REPLICATED {
1460 self.fill_non_output_indices(chunk)
1461 } else {
1462 chunk
1463 };
1464
1465 let vnodes = self
1466 .distribution
1467 .compute_chunk_vnode(&chunk, &self.pk_indices);
1468
1469 for (idx, optional_row) in chunk.rows_with_holes().enumerate() {
1470 let Some((op, row)) = optional_row else {
1471 continue;
1472 };
1473 let pk = row.project(&self.pk_indices);
1474 let vnode = vnodes[idx];
1475 let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, vnode);
1476 match op {
1477 Op::Insert | Op::UpdateInsert => {
1478 dispatch_value_indices!(&self.value_indices, [row], {
1479 self.row_store.insert(key_bytes, row);
1480 });
1481 }
1482 Op::Delete | Op::UpdateDelete => {
1483 dispatch_value_indices!(&self.value_indices, [row], {
1484 self.row_store.delete(key_bytes, row);
1485 });
1486 }
1487 }
1488 }
1489 }
1490
1491 pub fn update_watermark(&mut self, watermark: ScalarImpl) {
1497 trace!(table_id = %self.table_id, watermark = ?watermark, "update watermark");
1498 self.pending_watermark = Some(watermark);
1499 }
1500
1501 pub fn get_committed_watermark(&self) -> Option<&ScalarImpl> {
1504 self.committed_watermark.as_ref()
1505 }
1506
1507 pub async fn commit(
1508 &mut self,
1509 new_epoch: EpochPair,
1510 ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED>> {
1511 self.commit_inner(new_epoch, None).await
1512 }
1513
1514 #[cfg(test)]
1515 pub async fn commit_for_test(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
1516 self.commit_assert_no_update_vnode_bitmap(new_epoch).await
1517 }
1518
1519 pub async fn commit_assert_no_update_vnode_bitmap(
1520 &mut self,
1521 new_epoch: EpochPair,
1522 ) -> StreamExecutorResult<()> {
1523 let post_commit = self.commit_inner(new_epoch, None).await?;
1524 post_commit.post_yield_barrier(None).await?;
1525 Ok(())
1526 }
1527
1528 pub async fn commit_may_switch_consistent_op(
1529 &mut self,
1530 new_epoch: EpochPair,
1531 op_consistency_level: StateTableOpConsistencyLevel,
1532 ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED>> {
1533 if self.op_consistency_level != op_consistency_level {
1534 if !cfg!(debug_assertions) {
1536 info!(
1537 ?new_epoch,
1538 prev_op_consistency_level = ?self.op_consistency_level,
1539 ?op_consistency_level,
1540 table_id = %self.table_id,
1541 "switch to new op consistency level"
1542 );
1543 }
1544 self.commit_inner(new_epoch, Some(op_consistency_level))
1545 .await
1546 } else {
1547 self.commit_inner(new_epoch, None).await
1548 }
1549 }
1550
1551 async fn commit_inner(
1552 &mut self,
1553 new_epoch: EpochPair,
1554 switch_consistent_op: Option<StateTableOpConsistencyLevel>,
1555 ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED>> {
1556 assert!(!self.on_post_commit);
1557 assert_eq!(
1558 self.epoch.expect("should only be called after init").curr,
1559 new_epoch.prev
1560 );
1561 if let Some(new_consistency_level) = switch_consistent_op {
1562 assert_ne!(self.op_consistency_level, new_consistency_level);
1563 self.op_consistency_level = new_consistency_level;
1564 }
1565 trace!(
1566 table_id = %self.table_id,
1567 epoch = ?self.epoch,
1568 "commit state table"
1569 );
1570
1571 let table_watermarks = self.commit_pending_watermark();
1572 self.row_store
1573 .seal_current_epoch(new_epoch.curr, table_watermarks, switch_consistent_op)
1574 .await?;
1575 self.epoch = Some(new_epoch);
1576
1577 self.on_post_commit = true;
1578 Ok(StateTablePostCommit { inner: self })
1579 }
1580
1581 fn commit_pending_watermark(
1583 &mut self,
1584 ) -> Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)> {
1585 let watermark = self.pending_watermark.take()?;
1586 trace!(table_id = %self.table_id, watermark = ?watermark, "state cleaning");
1587
1588 assert!(
1589 !self.pk_indices().is_empty(),
1590 "see pending watermark on empty pk"
1591 );
1592 let (watermark_serializer, watermark_type) = self
1593 .watermark_serde
1594 .as_ref()
1595 .expect("watermark serde should be initialized to commit watermark");
1596 let watermark_suffix =
1597 serialize_row(row::once(Some(watermark.clone())), watermark_serializer);
1598 let vnode_watermark = VnodeWatermark::new(
1599 self.vnodes().clone(),
1600 Bytes::copy_from_slice(watermark_suffix.as_ref()),
1601 );
1602 trace!(table_id = %self.table_id, ?vnode_watermark, "table watermark");
1603
1604 let order_type = watermark_serializer.get_order_types().get(0).unwrap();
1605 let direction = if order_type.is_ascending() {
1606 WatermarkDirection::Ascending
1607 } else {
1608 WatermarkDirection::Descending
1609 };
1610
1611 self.committed_watermark = Some(watermark);
1612 Some((direction, vec![vnode_watermark], *watermark_type))
1613 }
1614
1615 pub async fn try_flush(&mut self) -> StreamExecutorResult<()> {
1616 self.row_store.try_flush().await?;
1617 Ok(())
1618 }
1619}
1620
1621pub trait RowStream<'a>: Stream<Item = StreamExecutorResult<OwnedRow>> + 'a {}
1623impl<'a, S: Stream<Item = StreamExecutorResult<OwnedRow>> + 'a> RowStream<'a> for S {}
1624
1625pub trait KeyedRowStream<'a>: Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a {}
1626impl<'a, S: Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a> KeyedRowStream<'a> for S {}
1627
1628pub trait PkRowStream<'a, K>: Stream<Item = StreamExecutorResult<(K, OwnedRow)>> + 'a {}
1629impl<'a, K, S: Stream<Item = StreamExecutorResult<(K, OwnedRow)>> + 'a> PkRowStream<'a, K> for S {}
1630
1631pub type BoxedRowStream<'a> = BoxStream<'a, StreamExecutorResult<OwnedRow>>;
1632
1633pub trait FromVnodeBytes {
1634 fn from_vnode_bytes(vnode: VirtualNode, bytes: &Bytes) -> Self;
1635}
1636
1637impl FromVnodeBytes for Bytes {
1638 fn from_vnode_bytes(vnode: VirtualNode, bytes: &Bytes) -> Self {
1639 prefix_slice_with_vnode(vnode, bytes)
1640 }
1641}
1642
1643impl FromVnodeBytes for () {
1644 fn from_vnode_bytes(_vnode: VirtualNode, _bytes: &Bytes) -> Self {}
1645}
1646
1647impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1649where
1650 S: StateStore,
1651 SD: ValueRowSerde,
1652{
1653 pub async fn iter_with_vnode(
1656 &self,
1657
1658 vnode: VirtualNode,
1662 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1663 prefetch_options: PrefetchOptions,
1664 ) -> StreamExecutorResult<impl RowStream<'_>> {
1665 Ok(self
1666 .iter_kv_with_pk_range::<()>(pk_range, vnode, prefetch_options)
1667 .await?
1668 .map_ok(|(_, row)| row))
1669 }
1670
1671 pub async fn iter_keyed_row_with_vnode(
1672 &self,
1673 vnode: VirtualNode,
1674 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1675 prefetch_options: PrefetchOptions,
1676 ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1677 Ok(self
1678 .iter_kv_with_pk_range(pk_range, vnode, prefetch_options)
1679 .await?
1680 .map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)))
1681 }
1682
1683 pub async fn iter_with_vnode_and_output_indices(
1684 &self,
1685 vnode: VirtualNode,
1686 pk_range: &(Bound<impl Row>, Bound<impl Row>),
1687 prefetch_options: PrefetchOptions,
1688 ) -> StreamExecutorResult<impl RowStream<'_>> {
1689 assert!(IS_REPLICATED);
1690 let stream = self
1691 .iter_with_vnode(vnode, pk_range, prefetch_options)
1692 .await?;
1693 Ok(stream.map(|row| row.map(|row| row.project(&self.output_indices).into_owned_row())))
1694 }
1695}
1696
1697impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1698 async fn iter_kv<K: CopyFromSlice + FromVnodeBytes>(
1703 &self,
1704 vnode: VirtualNode,
1705 (start, end): (Bound<Bytes>, Bound<Bytes>),
1706 prefix_hint: Option<Bytes>,
1707 prefetch_options: PrefetchOptions,
1708 ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1709 if let Some(m) = &self.metrics {
1710 m.iter_count.inc();
1711 }
1712 let (pruned_start, pruned_end) = if let Some(stats) = &self.vnode_stats
1714 && let Some(vnode_stat) = stats.get(&vnode)
1715 {
1716 match vnode_stat.pruned_key_range(&start, &end) {
1717 Some((new_start, new_end)) => (new_start, new_end),
1718 None => {
1719 if let Some(m) = &self.metrics {
1720 m.iter_vnode_pruned_count.inc();
1721 }
1722 return Ok(futures::future::Either::Left(futures::stream::empty()));
1723 }
1724 }
1725 } else {
1726 (start, end)
1727 };
1728
1729 if let Some(rows) = &self.all_rows {
1730 return Ok(futures::future::Either::Right(
1731 futures::future::Either::Left(futures::stream::iter(
1732 rows.get(&vnode)
1733 .expect("covered vnode")
1734 .range((pruned_start, pruned_end))
1735 .map(move |(key, value)| {
1736 Ok((K::from_vnode_bytes(vnode, key), value.clone()))
1737 }),
1738 )),
1739 ));
1740 }
1741 let read_options = ReadOptions {
1742 prefix_hint,
1743 prefetch_options,
1744 cache_policy: CachePolicy::Fill(Hint::Normal),
1745 };
1746
1747 Ok(futures::future::Either::Right(
1748 futures::future::Either::Right(deserialize_keyed_row_stream(
1749 self.state_store
1750 .iter(
1751 prefixed_range_with_vnode((pruned_start, pruned_end), vnode),
1752 read_options,
1753 )
1754 .await?,
1755 &*self.row_serde,
1756 )),
1757 ))
1758 }
1759
1760 async fn rev_iter_kv<K: CopyFromSlice + FromVnodeBytes>(
1761 &self,
1762 vnode: VirtualNode,
1763 (start, end): (Bound<Bytes>, Bound<Bytes>),
1764 prefix_hint: Option<Bytes>,
1765 prefetch_options: PrefetchOptions,
1766 ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1767 if let Some(m) = &self.metrics {
1768 m.iter_count.inc();
1769 }
1770 let (pruned_start, pruned_end) = if let Some(stats) = &self.vnode_stats
1772 && let Some(vnode_stat) = stats.get(&vnode)
1773 {
1774 match vnode_stat.pruned_key_range(&start, &end) {
1775 Some((new_start, new_end)) => (new_start, new_end),
1776 None => {
1777 if let Some(m) = &self.metrics {
1778 m.iter_vnode_pruned_count.inc();
1779 }
1780 return Ok(futures::future::Either::Left(futures::stream::empty()));
1781 }
1782 }
1783 } else {
1784 (start, end)
1785 };
1786
1787 if let Some(rows) = &self.all_rows {
1788 return Ok(futures::future::Either::Right(
1789 futures::future::Either::Left(futures::stream::iter(
1790 rows.get(&vnode)
1791 .expect("covered vnode")
1792 .range((pruned_start, pruned_end))
1793 .rev()
1794 .map(move |(key, value)| {
1795 Ok((K::from_vnode_bytes(vnode, key), value.clone()))
1796 }),
1797 )),
1798 ));
1799 }
1800 let read_options = ReadOptions {
1801 prefix_hint,
1802 prefetch_options,
1803 cache_policy: CachePolicy::Fill(Hint::Normal),
1804 };
1805
1806 Ok(futures::future::Either::Right(
1807 futures::future::Either::Right(deserialize_keyed_row_stream(
1808 self.state_store
1809 .rev_iter(
1810 prefixed_range_with_vnode((pruned_start, pruned_end), vnode),
1811 read_options,
1812 )
1813 .await?,
1814 &*self.row_serde,
1815 )),
1816 ))
1817 }
1818}
1819
1820impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1821where
1822 S: StateStore,
1823 SD: ValueRowSerde,
1824{
1825 pub async fn iter_with_prefix(
1829 &self,
1830 pk_prefix: impl Row,
1831 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1832 prefetch_options: PrefetchOptions,
1833 ) -> StreamExecutorResult<impl RowStream<'_>> {
1834 let stream = self.iter_with_prefix_inner::<false, ()>(pk_prefix, sub_range, prefetch_options)
1835 .await?;
1836 Ok(stream.map_ok(|(_, row)| row))
1837 }
1838
1839 pub async fn iter_with_prefix_respecting_watermark(
1845 &self,
1846 pk_prefix: impl Row,
1847 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1848 prefetch_options: PrefetchOptions,
1849 ) -> StreamExecutorResult<BoxedRowStream<'_>> {
1850 let vnode = self.compute_prefix_vnode(&pk_prefix);
1851 let stream = self
1852 .iter_with_prefix(pk_prefix, sub_range, prefetch_options)
1853 .await?;
1854 let Some(clean_watermark_index) = self.clean_watermark_index else {
1855 return Ok(stream.boxed());
1856 };
1857 let Some((watermark_serde, watermark_type)) = &self.watermark_serde else {
1858 return Err(StreamExecutorError::from(anyhow!(
1859 "Missing watermark serde"
1860 )));
1861 };
1862 if matches!(watermark_type, WatermarkSerdeType::PkPrefix) {
1864 return Ok(stream.boxed());
1865 }
1866 let watermark_bytes = self.row_store.state_store.get_table_watermark(vnode);
1867 let Some(watermark_bytes) = watermark_bytes else {
1868 return Ok(stream.boxed());
1869 };
1870 let watermark_row = watermark_serde.deserialize(&watermark_bytes)?;
1871 if watermark_row.len() != 1 {
1872 return Err(StreamExecutorError::from(format!(
1873 "Watermark row should have exactly 1 column, got {}",
1874 watermark_row.len()
1875 )));
1876 }
1877 let watermark_value = watermark_row[0].clone();
1878 if watermark_value.is_none() {
1880 return Err(StreamExecutorError::from(anyhow!(
1881 "Watermark cannot be NULL"
1882 )));
1883 }
1884 let order_type = watermark_serde.get_order_types().get(0).ok_or_else(|| {
1885 StreamExecutorError::from(anyhow!(
1886 "Watermark serde should have at least one order type"
1887 ))
1888 })?;
1889 let direction = if order_type.is_ascending() {
1890 WatermarkDirection::Ascending
1891 } else {
1892 WatermarkDirection::Descending
1893 };
1894 let stream = stream.try_filter_map(move |row| {
1895 let watermark_col_value = row.datum_at(clean_watermark_index);
1896 let should_filter = direction.datum_filter_by_watermark(
1897 watermark_col_value,
1898 &watermark_value,
1899 *order_type,
1900 );
1901 if should_filter {
1902 ready(Ok(None))
1903 } else {
1904 ready(Ok(Some(row)))
1905 }
1906 });
1907 Ok(stream.boxed())
1908 }
1909
1910 pub async fn get_from_one_row_table(&self) -> StreamExecutorResult<Option<OwnedRow>> {
1912 let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
1913 let stream = self
1914 .iter_with_prefix(row::empty(), sub_range, Default::default())
1915 .await?;
1916 pin_mut!(stream);
1917
1918 if let Some(res) = stream.next().await {
1919 let value = res?.into_owned_row();
1920 assert!(stream.next().await.is_none());
1921 Ok(Some(value))
1922 } else {
1923 Ok(None)
1924 }
1925 }
1926
1927 pub async fn get_from_one_value_table(&self) -> StreamExecutorResult<Option<ScalarImpl>> {
1932 Ok(self
1933 .get_from_one_row_table()
1934 .await?
1935 .and_then(|row| row[0].clone()))
1936 }
1937
1938 pub async fn iter_keyed_row_with_prefix(
1939 &self,
1940 pk_prefix: impl Row,
1941 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1942 prefetch_options: PrefetchOptions,
1943 ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1944 Ok(
1945 self.iter_with_prefix_inner::<false, Bytes>(pk_prefix, sub_range, prefetch_options)
1946 .await?.map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)),
1947 )
1948 }
1949
1950 pub async fn rev_iter_with_prefix(
1952 &self,
1953 pk_prefix: impl Row,
1954 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1955 prefetch_options: PrefetchOptions,
1956 ) -> StreamExecutorResult<impl RowStream<'_>> {
1957 Ok(
1958 self.iter_with_prefix_inner::<true, ()>(pk_prefix, sub_range, prefetch_options)
1959 .await?.map_ok(|(_, row)| row),
1960 )
1961 }
1962
1963 async fn iter_with_prefix_inner<const REVERSE: bool, K: CopyFromSlice + FromVnodeBytes>(
1964 &self,
1965 pk_prefix: impl Row,
1966 sub_range: &(Bound<impl Row>, Bound<impl Row>),
1967 prefetch_options: PrefetchOptions,
1968 ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1969 let prefix_serializer = self.pk_serde.prefix(pk_prefix.len());
1970 let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
1971
1972 let vnode = self.compute_prefix_vnode(&pk_prefix);
1976
1977 let pk_prefix_indices = &self.pk_indices[..pk_prefix.len()];
1979 if self.prefix_hint_len != 0 {
1980 debug_assert_eq!(self.prefix_hint_len, pk_prefix.len());
1981 }
1982 let prefix_hint = {
1983 if self.prefix_hint_len == 0 || self.prefix_hint_len > pk_prefix.len() {
1984 None
1985 } else {
1986 let encoded_prefix_len = self
1987 .pk_serde
1988 .deserialize_prefix_len(&encoded_prefix, self.prefix_hint_len)?;
1989
1990 Some(Bytes::copy_from_slice(
1991 &encoded_prefix[..encoded_prefix_len],
1992 ))
1993 }
1994 };
1995
1996 trace!(
1997 table_id = %self.table_id(),
1998 ?prefix_hint, ?pk_prefix,
1999 ?pk_prefix_indices,
2000 iter_direction = if REVERSE { "reverse" } else { "forward" },
2001 "storage_iter_with_prefix"
2002 );
2003
2004 let memcomparable_range =
2005 prefix_and_sub_range_to_memcomparable(&self.pk_serde, sub_range, pk_prefix);
2006
2007 Ok(if REVERSE {
2008 futures::future::Either::Left(
2009 self.row_store
2010 .rev_iter_kv(vnode, memcomparable_range, prefix_hint, prefetch_options)
2011 .await?,
2012 )
2013 } else {
2014 futures::future::Either::Right(
2015 self.row_store
2016 .iter_kv(vnode, memcomparable_range, prefix_hint, prefetch_options)
2017 .await?,
2018 )
2019 })
2020 }
2021
2022 async fn iter_kv_with_pk_range<'a, K: CopyFromSlice + FromVnodeBytes>(
2025 &'a self,
2026 pk_range: &(Bound<impl Row>, Bound<impl Row>),
2027 vnode: VirtualNode,
2031 prefetch_options: PrefetchOptions,
2032 ) -> StreamExecutorResult<impl PkRowStream<'a, K>> {
2033 let memcomparable_range = prefix_range_to_memcomparable(&self.pk_serde, pk_range);
2034
2035 self.row_store
2037 .iter_kv(vnode, memcomparable_range, None, prefetch_options)
2038 .await
2039 }
2040}
2041
2042fn deserialize_keyed_row_stream<'a, K: CopyFromSlice>(
2043 iter: impl StateStoreIter + 'a,
2044 deserializer: &'a impl ValueRowSerde,
2045) -> impl PkRowStream<'a, K> {
2046 iter.into_stream(move |(key, value)| {
2047 Ok((
2048 K::copy_from_slice(key.user_key.table_key.as_ref()),
2049 deserializer.deserialize(value).map(OwnedRow::new)?,
2050 ))
2051 })
2052 .map_err(Into::into)
2053}
2054
2055pub fn prefix_range_to_memcomparable(
2056 pk_serde: &OrderedRowSerde,
2057 range: &(Bound<impl Row>, Bound<impl Row>),
2058) -> (Bound<Bytes>, Bound<Bytes>) {
2059 (
2060 start_range_to_memcomparable(pk_serde, &range.0),
2061 end_range_to_memcomparable(pk_serde, &range.1, None),
2062 )
2063}
2064
2065fn prefix_and_sub_range_to_memcomparable(
2066 pk_serde: &OrderedRowSerde,
2067 sub_range: &(Bound<impl Row>, Bound<impl Row>),
2068 pk_prefix: impl Row,
2069) -> (Bound<Bytes>, Bound<Bytes>) {
2070 let (range_start, range_end) = sub_range;
2071 let prefix_serializer = pk_serde.prefix(pk_prefix.len());
2072 let serialized_pk_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
2073 let start_range = match range_start {
2074 Included(start_range) => Bound::Included(Either::Left((&pk_prefix).chain(start_range))),
2075 Excluded(start_range) => Bound::Excluded(Either::Left((&pk_prefix).chain(start_range))),
2076 Unbounded => Bound::Included(Either::Right(&pk_prefix)),
2077 };
2078 let end_range = match range_end {
2079 Included(end_range) => Bound::Included((&pk_prefix).chain(end_range)),
2080 Excluded(end_range) => Bound::Excluded((&pk_prefix).chain(end_range)),
2081 Unbounded => Unbounded,
2082 };
2083 (
2084 start_range_to_memcomparable(pk_serde, &start_range),
2085 end_range_to_memcomparable(pk_serde, &end_range, Some(serialized_pk_prefix)),
2086 )
2087}
2088
2089fn start_range_to_memcomparable<R: Row>(
2090 pk_serde: &OrderedRowSerde,
2091 bound: &Bound<R>,
2092) -> Bound<Bytes> {
2093 let serialize_pk_prefix = |pk_prefix: &R| {
2094 let prefix_serializer = pk_serde.prefix(pk_prefix.len());
2095 serialize_pk(pk_prefix, &prefix_serializer)
2096 };
2097 match bound {
2098 Unbounded => Unbounded,
2099 Included(r) => {
2100 let serialized = serialize_pk_prefix(r);
2101
2102 Included(serialized)
2103 }
2104 Excluded(r) => {
2105 let serialized = serialize_pk_prefix(r);
2106
2107 start_bound_of_excluded_prefix(&serialized)
2108 }
2109 }
2110}
2111
2112fn end_range_to_memcomparable<R: Row>(
2113 pk_serde: &OrderedRowSerde,
2114 bound: &Bound<R>,
2115 serialized_pk_prefix: Option<Bytes>,
2116) -> Bound<Bytes> {
2117 let serialize_pk_prefix = |pk_prefix: &R| {
2118 let prefix_serializer = pk_serde.prefix(pk_prefix.len());
2119 serialize_pk(pk_prefix, &prefix_serializer)
2120 };
2121 match bound {
2122 Unbounded => match serialized_pk_prefix {
2123 Some(serialized_pk_prefix) => end_bound_of_prefix(&serialized_pk_prefix),
2124 None => Unbounded,
2125 },
2126 Included(r) => {
2127 let serialized = serialize_pk_prefix(r);
2128
2129 end_bound_of_prefix(&serialized)
2130 }
2131 Excluded(r) => {
2132 let serialized = serialize_pk_prefix(r);
2133 Excluded(serialized)
2134 }
2135 }
2136}
2137
2138fn fill_non_output_indices(
2139 i2o_mapping: &ColIndexMapping,
2140 data_types: &[DataType],
2141 chunk: StreamChunk,
2142) -> StreamChunk {
2143 let cardinality = chunk.cardinality();
2144 let (ops, columns, vis) = chunk.into_inner();
2145 let mut full_columns = Vec::with_capacity(data_types.len());
2146 for (i, data_type) in data_types.iter().enumerate() {
2147 if let Some(j) = i2o_mapping.try_map(i) {
2148 full_columns.push(columns[j].clone());
2149 } else {
2150 let mut column_builder = ArrayImplBuilder::with_type(cardinality, data_type.clone());
2151 column_builder.append_n_null(cardinality);
2152 let column: ArrayRef = column_builder.finish().into();
2153 full_columns.push(column)
2154 }
2155 }
2156 let data_chunk = DataChunk::new(full_columns, vis);
2157 StreamChunk::from_parts(ops, data_chunk)
2158}
2159
2160#[cfg(test)]
2161mod tests {
2162 use std::fmt::Debug;
2163
2164 use expect_test::{Expect, expect};
2165
2166 use super::*;
2167
2168 fn check(actual: impl Debug, expect: Expect) {
2169 let actual = format!("{:#?}", actual);
2170 expect.assert_eq(&actual);
2171 }
2172
2173 #[test]
2174 fn test_fill_non_output_indices() {
2175 let data_types = vec![DataType::Int32, DataType::Int32, DataType::Int32];
2176 let replicated_chunk = [OwnedRow::new(vec![
2177 Some(222_i32.into()),
2178 Some(2_i32.into()),
2179 ])];
2180 let replicated_chunk = StreamChunk::from_parts(
2181 vec![Op::Insert],
2182 DataChunk::from_rows(&replicated_chunk, &[DataType::Int32, DataType::Int32]),
2183 );
2184 let i2o_mapping = ColIndexMapping::new(vec![Some(1), None, Some(0)], 2);
2185 let filled_chunk = fill_non_output_indices(&i2o_mapping, &data_types, replicated_chunk);
2186 check(
2187 filled_chunk,
2188 expect![[r#"
2189 StreamChunk { cardinality: 1, capacity: 1, data:
2190 +---+---+---+-----+
2191 | + | 2 | | 222 |
2192 +---+---+---+-----+
2193 }"#]],
2194 );
2195 }
2196}