risingwave_stream/from_proto/
agg_common.rs1use std::collections::HashMap;
16use std::sync::Arc;
17
18use risingwave_common::bitmap::Bitmap;
19use risingwave_common::config::StreamingConfig;
20use risingwave_common::util::sort_util::ColumnOrder;
21
22use super::*;
23use crate::common::StateTableColumnMapping;
24use crate::common::table::state_table::{StateTable, StateTableBuilder};
25use crate::executor::aggregate::AggStateStorage;
26
27pub async fn build_agg_state_storages_from_proto<S: StateStore>(
30 agg_call_states: &[risingwave_pb::stream_plan::AggCallState],
31 store: S,
32 vnodes: Option<Arc<Bitmap>>,
33 streaming_config: &StreamingConfig,
34) -> Vec<AggStateStorage<S>> {
35 use risingwave_pb::stream_plan::agg_call_state;
36
37 let mut result = vec![];
38 for agg_call_state in agg_call_states {
39 let agg_state_store = match agg_call_state.get_inner().unwrap() {
40 agg_call_state::Inner::ValueState(..) => AggStateStorage::Value,
41 agg_call_state::Inner::MaterializedInputState(state) => {
42 let table = StateTableBuilder::new(
43 state.get_table().unwrap(),
44 store.clone(),
45 vnodes.clone(),
46 )
47 .enable_preload_all_rows_by_config(streaming_config)
48 .build()
49 .await;
50 let mapping = StateTableColumnMapping::new(
51 state
52 .get_included_upstream_indices()
53 .iter()
54 .map(|idx| *idx as usize)
55 .collect(),
56 Some(
57 state
58 .get_table_value_indices()
59 .iter()
60 .map(|idx| *idx as usize)
61 .collect(),
62 ),
63 );
64 let order_columns = state
65 .order_columns
66 .iter()
67 .map(ColumnOrder::from_protobuf)
68 .collect();
69 AggStateStorage::MaterializedInput {
70 table,
71 mapping,
72 order_columns,
73 }
74 }
75 };
76
77 result.push(agg_state_store)
78 }
79
80 result
81}
82
83pub async fn build_distinct_dedup_table_from_proto<S: StateStore>(
84 dedup_tables: &HashMap<u32, risingwave_pb::catalog::Table>,
85 store: S,
86 vnodes: Option<Arc<Bitmap>>,
87 streaming_config: &StreamingConfig,
88) -> HashMap<usize, StateTable<S>> {
89 if dedup_tables.is_empty() {
90 return HashMap::new();
91 }
92 futures::future::join_all(dedup_tables.iter().map(|(distinct_col, table_pb)| async {
93 let table = StateTableBuilder::new(table_pb, store.clone(), vnodes.clone())
94 .enable_preload_all_rows_by_config(streaming_config)
95 .build()
96 .await;
97 (*distinct_col as usize, table)
98 }))
99 .await
100 .into_iter()
101 .collect()
102}